Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 8 commits
  • 21 files changed
  • 7 commit comments
  • 2 contributors
Commits on Jun 11, 2012
Jim Carroll Readding the refactor now that the tests are fixed. This refactor inc…
…ludes:

1) Moving the relationship between the MpCluster manager and the RoutingStrategy down into the RoutingStrategy rather than having the Router be an intermediary.
2) More robust handling of issues with communicating with the MpCluster manager.
3) Test log tuning
cfb699a
Jim Carroll Fixed build with maven 2 surefire runner that assumed any class named…
… with Test* was a test class.
33bedf6
Jim Carroll Merge pull request #45 from jimfcarroll/router-refactor-take4
Router refactor take4
a041d84
Commits on Jun 14, 2012
Jim Carroll Backed out the previous test hook changes and incorporated a differen…
…t approach.
a2f4960
Jim Carroll Make sure MpClusterSessions that are opened for testing purposes are …
…cleaned up.
646e1f5
Jim Carroll Merge pull request #46 from jimfcarroll/router-refactor-take5
Backed out the previous test hook changes and incorporated a different a...
9183ed9
Commits on Jun 15, 2012
Jim Carroll [fix] minor fix to evict test by increasing the timeout. Also catch a…
…nd log possible exceptions.
9f53dee
@catalincapota catalincapota Merge pull request #47 from jimfcarroll/minor-evict-test-fix
[fix] minor fix to evict test by increasing the timeout. Also catch and ...
952c066
Showing with 863 additions and 619 deletions.
  1. +11 −0 lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ApplicationDefinition.java
  2. +1 −0  lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ClusterDefinition.java
  3. +1 −1  lib-dempsycore/src/main/java/com/nokia/dempsy/mpcluster/MpCluster.java
  4. +85 −18 lib-dempsycore/src/main/java/com/nokia/dempsy/router/RoutingStrategy.java
  5. +54 −75 lib-dempsyimpl/src/main/java/com/nokia/dempsy/Dempsy.java
  6. +15 −1 lib-dempsyimpl/src/main/java/com/nokia/dempsy/container/MpContainer.java
  7. +2 −0  lib-dempsyimpl/src/main/java/com/nokia/dempsy/messagetransport/tcp/TcpDestination.java
  8. +40 −1 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/invm/LocalVmMpClusterSessionFactory.java
  9. +126 −102 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/zookeeper/ZookeeperSession.java
  10. +264 −61 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/DefaultRoutingStrategy.java
  11. +102 −121 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/Router.java
  12. +17 −29 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestDempsy.java
  13. +82 −0 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestUtils.java
  14. +4 −3 lib-dempsyimpl/src/test/java/com/nokia/dempsy/container/TestMpContainer.java
  15. +2 −13 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/TestAllMpClusterImpls.java
  16. +12 −23 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestFullApp.java
  17. +7 −14 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestZookeeperClusterResilience.java
  18. +0 −99 lib-dempsyimpl/src/test/java/com/nokia/dempsy/router/MpClusterTestImpl.java
  19. +30 −50 lib-dempsyimpl/src/test/java/com/nokia/dempsy/router/TestRouterClusterManagement.java
  20. +5 −5 lib-dempsyimpl/src/test/resources/log4j.properties
  21. +3 −3 lib-dempsyimpl/src/test/resources/testDempsy/SinglestageOutputApplicationActx.xml
View
11 lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ApplicationDefinition.java
@@ -121,6 +121,17 @@ public ApplicationDefinition setClusterDefinitions(List<ClusterDefinition> clust
public List<ClusterDefinition> getClusterDefinitions() { return Collections.unmodifiableList(clusterDefinitions); }
/**
+ * Get the {@link ClusterDefinition} that corresponds to the given clusterId.
+ */
+ public ClusterDefinition getClusterDefinition(ClusterId clusterId)
+ {
+ for (ClusterDefinition cur : clusterDefinitions)
+ if (cur.getClusterId().equals(clusterId))
+ return cur;
+ return null;
+ }
+
+ /**
* When configuring by hand, this method
* @param clusterDefinitions is the {@link ClusterDefinition}s that make
* up this Application.
View
1  lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ClusterDefinition.java
@@ -216,6 +216,7 @@ public void validate() throws DempsyException
if (messageProcessorPrototype != null && adaptor != null)
throw new DempsyException("A dempsy cluster must contain either an 'adaptor' or a message processor prototype but not both. " +
clusterId + " appears to be configured with both.");
+
if (messageProcessorPrototype != null)
{
View
2  lib-dempsycore/src/main/java/com/nokia/dempsy/mpcluster/MpCluster.java
@@ -74,7 +74,7 @@
* Every MpCluster instance participating in a cluster will have the
* same cluster Id, which identifies the total set of Mps of the same prototype.
*/
- public ClusterId getClusterId() throws MpClusterException;
+ public ClusterId getClusterId();
/**
* Sets the cluster level data.
View
103 lib-dempsycore/src/main/java/com/nokia/dempsy/router/RoutingStrategy.java
@@ -16,16 +16,17 @@
package com.nokia.dempsy.router;
-import java.util.List;
+import java.util.Collection;
import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.annotations.MessageProcessor;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;
+import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.mpcluster.MpCluster;
-import com.nokia.dempsy.mpcluster.MpClusterException;
+import com.nokia.dempsy.router.RoutingStrategy.Outbound.Coordinator;
/**
* <p>A {@link RoutingStrategy} is responsible for determining how to find the appropriate
@@ -72,15 +73,56 @@
*/
public static interface Outbound
{
- public SlotInformation selectSlotForMessageKey(Object messageKey) throws DempsyException;
+ /**
+ * This method needs to be implemented to determine the specific node that the outgoing
+ * message is to be routed to.
+ *
+ * @param messageKey is the message key for the message to be routed
+ * @param message is the message to be routed.
+ * @return a transport Destination indicating the unique node in the downstream cluster
+ * that the message should go to.
+ * @throws DempsyException when something distasteful happens.
+ */
+ public Destination selectDestinationForMessage(Object messageKey, Object message) throws DempsyException;
+
+ /**
+ * The {@link Outbound} is responsible for providing the {@link ClusterId} for which it is the
+ * {@link Outbound} for.
+ */
+ public ClusterId getClusterId();
+
+ /**
+ * <p>Each node can have many Outbound instances and those Outbound cluster references
+ * can come and go. In order to tell Dempsy about what's going on in the cluster
+ * the Outbound should be updating the state of the OutboundCoordinator.</p>
+ *
+ * <p>Implementors of the RoutingStrategy do not need to implement this interface.
+ * There is only one implementation and that instance will be supplied by the
+ * framework.</p>
+ */
+ public static interface Coordinator
+ {
+ /**
+ * registers the outbound with the Coordinator and provide what types the destination
+ * cluster can handle. Note that the Outbound is allowed to call registerOutbound
+ * more than once, without calling unregisterOutbound first but the results should
+ * be the same.
+ */
+ public void registerOutbound(Outbound outbound, Collection<Class<?>> classes);
+
+ /**
+ * registers the outbound with the Coordinator and provide what types the destination
+ * cluster can handle.
+ */
+ public void unregisterOutbound(Outbound outbound);
+ }
+
/**
- * resetCluster is called when the cluster for the Outbound side changes. In this
- * way implementations of this class do not need to be MpClusterWatchers
- * @param cluster - the cluster handle containing the new state.
- * @throws MpClusterException when the implementation has a problem accessing the cluster
+ * Shut down and reclaim any resources associated with the {@link Outbound} instance.
*/
- public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster) throws MpClusterException;
+ public void stop();
+
}
/**
@@ -93,21 +135,46 @@
public static interface Inbound
{
/**
- * <p>resetCluster is called when the cluster for the Inbound side changes. In this
- * way implementations of this class do not need to be MpClusterWatchers.</p>
- *
- * @param cluster - the cluster handle containing the new state.
- * @throws MpClusterException when the implementation has a problem accessing the cluster
+ * Since the {@link Inbound} has the responsibility to determine which instances of a
+ * {@link MessageProcessor} are valid in 'this' node, it should be able to privide that
+ * information through the implementataion of this method. This is used as part of the
+ * Pre-instantiation phase of the Message Processor's lifecylce.
*/
- public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster,
- List<Class<?>> messageTypes, Destination thisDestination) throws MpClusterException;
-
public boolean doesMessageKeyBelongToNode(Object messageKey);
+
+ /**
+ * Shut down and reclaim any resources associated with the {@link Inbound} instance.
+ */
+ public void stop();
}
- public Inbound createInbound();
+ /**
+ * This method will be called from the Dempsy framework in order to instantiate the one Inbound for
+ * 'this' node. Keep in mind that when running in LocalVm mode there can be more than one inbound per
+ * process.
+ *
+ * @param cluster is the cluster information manager handle for 'this' node.
+ * @param messageTypes is the types of messages that Dempsy determined could be handled by the {@link MessageProcessor}
+ * in this node. This information should be shared across to the {@link Outbound} and registered with
+ * the {@link Coordinator} to allow Dempsy to restrict messages to the appropriate types.
+ * @param thisDestination is the transport Destination instance that represents how to communicate
+ * with 'this' node.
+ * @return the {@link Inbound} instance.
+ */
+ public Inbound createInbound(MpCluster<ClusterInformation, SlotInformation> cluster, Collection<Class<?>> messageTypes, Destination thisDestination);
- public Outbound createOutbound();
+ /**
+ * The RoutingStrategy needs to create an {@link Outbound} that corresponds to the given cluster. It should do this
+ * in a manner that absolutely succeeds even if the cluster information manager is in a bad state. This is why
+ * this method takes stable parameters and throws no exception.
+ *
+ * @param coordinator is the coordinator that the newly created {@link Outbound} can use to call back on the
+ * framework.
+ * @param clusterId is the cluster id that the {@link Outbound} is being created for.
+ * @return a new {@link Outbound} that manages the selection of a {@link Destination} given a message destined for
+ * the given cluster.
+ */
+ public Outbound createOutbound(Outbound.Coordinator coordinator, MpCluster<ClusterInformation, SlotInformation> cluster);
}
View
129 lib-dempsyimpl/src/main/java/com/nokia/dempsy/Dempsy.java
@@ -34,6 +34,7 @@
import com.nokia.dempsy.container.ContainerException;
import com.nokia.dempsy.container.MpContainer;
import com.nokia.dempsy.internal.util.SafeString;
+import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.messagetransport.Receiver;
import com.nokia.dempsy.messagetransport.Transport;
import com.nokia.dempsy.monitoring.StatsCollector;
@@ -42,7 +43,6 @@
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSession;
import com.nokia.dempsy.mpcluster.MpClusterSessionFactory;
-import com.nokia.dempsy.mpcluster.MpClusterWatcher;
import com.nokia.dempsy.output.OutputExecuter;
import com.nokia.dempsy.router.ClusterInformation;
import com.nokia.dempsy.router.CurrentClusterCheck;
@@ -78,7 +78,7 @@
* Currently a Node is instantiated within the Dempsy orchestrator as a one to one with the
* {@link Cluster}.
*/
- public class Node implements MpClusterWatcher
+ public class Node
{
protected ClusterDefinition clusterDefinition;
@@ -119,37 +119,34 @@ private void start() throws DempsyException
container.setSerializer(serializer);
// there is only a reciever if we have an Mp (that is, we aren't an adaptor) and start accepting messages
+ Destination thisDestination = null;
if (messageProcessorPrototype != null && acceptedMessageClasses != null && acceptedMessageClasses.size() > 0)
{
receiver = transport.createInbound();
receiver.setListener(container);
+ thisDestination = receiver.getDestination();
}
StatsCollectorFactory statsFactory = (StatsCollectorFactory)clusterDefinition.getStatsCollectorFactory();
if (statsFactory != null)
{
statsCollector = statsFactory.createStatsCollector(currentClusterId,
- receiver != null ? receiver.getDestination() : null);
+ receiver != null ? thisDestination : null);
router.setStatsCollector(statsCollector);
container.setStatCollector(statsCollector);
}
RoutingStrategy strategy = (RoutingStrategy)clusterDefinition.getRoutingStrategy();
+ currentClusterHandle = clusterSession.getCluster(currentClusterId);
+
// there is only an inbound strategy if we have an Mp (that is, we aren't an adaptor) and
// we actually accept messages
if (messageProcessorPrototype != null && acceptedMessageClasses != null && acceptedMessageClasses.size() > 0)
- strategyInbound = strategy.createInbound();
-
- currentClusterHandle = clusterSession.getCluster(currentClusterId);
+ strategyInbound = strategy.createInbound(currentClusterHandle,acceptedMessageClasses, thisDestination);
// this can fail because of down cluster manager server ... but it should eventually recover.
- try
- {
- if (strategyInbound != null && receiver != null)
- strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
- router.initialize();
- }
+ try { router.initialize(); }
catch (MpClusterException e)
{
logger.warn("Strategy failed to initialize. Continuing anyway. The cluster manager issue will be resolved automatically.",e);
@@ -206,22 +203,6 @@ public void run()
}, "Pre-Instantation Thread");
t.start();
}
-
- // now we want to set the Node as the watcher.
- if (strategyInbound != null && receiver != null)
- {
- currentClusterHandle.addWatcher(this);
-
- // and reset just in case something happened
- try
- {
- strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
- }
- catch (MpClusterException e)
- {
- logger.warn("Strategy failed to initialize. Continuing anyway. The cluster manager issue will be resolved automatically.",e);
- }
- }
}
catch(RuntimeException e) { throw e; }
catch(Exception e) { throw new DempsyException(e); }
@@ -231,20 +212,6 @@ public void run()
public MpContainer getMpContainer() { return container; }
- @Override
- public void process()
- {
- try
- {
- if (strategyInbound != null)
- strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
- }
- // TODO: fix these catches... .need to take note of a failure for a later retry
- // using a scheduled task.
- catch(RuntimeException e) { throw e; }
- catch(Exception e) { throw new RuntimeException(e); }
- }
-
public void stop()
{
if (receiver != null)
@@ -268,6 +235,8 @@ public void stop()
if (statsCollector != null)
try { statsCollector.stop(); statsCollector = null;} catch (Throwable th) { logger.error("Problem shutting down node for " + SafeString.valueOf(clusterDefinition), th); }
+ if (strategyInbound != null)
+ try { strategyInbound.stop(); strategyInbound = null;} catch (Throwable th) { logger.error("Problem shutting down node for " + SafeString.valueOf(clusterDefinition), th); }
}
/**
@@ -292,7 +261,7 @@ private void preInitializePrototype(Object prototype) {
} // end Node definition
private List<Node> nodes = new ArrayList<Node>(1);
- private ClusterDefinition clusterDefinition;
+ protected ClusterDefinition clusterDefinition;
private Cluster(ClusterDefinition clusterDefinition)
{
@@ -329,8 +298,8 @@ public void instantiateAndStartAnotherNodeForTesting() throws DempsyException
} // end Cluster Definition
- private ApplicationDefinition applicationDefinition;
- private List<Cluster> appClusters = new ArrayList<Cluster>();
+ protected ApplicationDefinition applicationDefinition;
+ protected List<Cluster> appClusters = new ArrayList<Cluster>();
private List<AdaptorThread> adaptorThreads = new ArrayList<AdaptorThread>();
public Application(ApplicationDefinition applicationDefinition)
@@ -461,9 +430,9 @@ public void stop()
}
private List<ApplicationDefinition> applicationDefinitions = null;
- private List<Application> applications = null;
+ protected List<Application> applications = null;
private CurrentClusterCheck clusterCheck = null;
- private MpClusterSessionFactory<ClusterInformation, SlotInformation> clusterSessionFactory = null;
+ protected MpClusterSessionFactory<ClusterInformation, SlotInformation> clusterSessionFactory = null;
private RoutingStrategy defaultRoutingStrategy = null;
private Serializer<Object> defaultSerializer = null;
private Transport transport = null;
@@ -514,39 +483,49 @@ public synchronized void start() throws DempsyException
if (defaultStatsCollectorFactory == null)
throw new DempsyException("Cannot start this application because there's no default stats collector factory defined.");
-
- applications = new ArrayList<Application>(applicationDefinitions.size());
- for(ApplicationDefinition appDef: this.applicationDefinitions)
+
+ try
{
- appDef.initialize();
- if (clusterCheck.isThisNodePartOfApplication(appDef.getApplicationName()))
+ applications = new ArrayList<Application>(applicationDefinitions.size());
+ for(ApplicationDefinition appDef: this.applicationDefinitions)
{
- Application app = new Application(appDef);
-
- // set the default routing strategy if there isn't one already set.
- if (appDef.getRoutingStrategy() == null)
- appDef.setRoutingStrategy(defaultRoutingStrategy);
-
- if (appDef.getSerializer() == null)
- appDef.setSerializer(defaultSerializer);
-
- if (appDef.getStatsCollectorFactory() == null)
- appDef.setStatsCollectorFactory(defaultStatsCollectorFactory);
-
- applications.add(app);
+ appDef.initialize();
+ if (clusterCheck.isThisNodePartOfApplication(appDef.getApplicationName()))
+ {
+ Application app = new Application(appDef);
+
+ // set the default routing strategy if there isn't one already set.
+ if (appDef.getRoutingStrategy() == null)
+ appDef.setRoutingStrategy(defaultRoutingStrategy);
+
+ if (appDef.getSerializer() == null)
+ appDef.setSerializer(defaultSerializer);
+
+ if (appDef.getStatsCollectorFactory() == null)
+ appDef.setStatsCollectorFactory(defaultStatsCollectorFactory);
+
+ applications.add(app);
+ }
+ }
+
+ boolean clusterStarted = false;
+ for (Application app : applications)
+ clusterStarted = app.start();
+
+ if(!clusterStarted)
+ {
+ throw new DempsyException("Cannot start this application because cluster defination was not found.");
}
+ // if we got to here we can assume we're started
+ synchronized(isRunningEvent) { isRunning = true; }
}
-
- boolean clusterStarted = false;
- for (Application app : applications)
- clusterStarted = app.start();
-
- if(!clusterStarted)
+ catch (RuntimeException rte)
{
- throw new DempsyException("Cannot start this application because cluster defination was not found.");
+ logger.error("Failed to start Dempsy. Attempting to stop.");
+ // if something unpexpected happened then we should attempt to stop
+ try { stop(); } catch (Throwable th) {}
+ throw rte;
}
- // if we got to here we can assume we're started
- synchronized(isRunningEvent) { isRunning = true; }
}
public synchronized void stop()
@@ -628,7 +607,7 @@ public boolean waitToBeStopped(long timeInMillis) throws InterruptedException
}
}
- private static List<Class<?>> getAcceptedMessages(ClusterDefinition clusterDef)
+ protected static List<Class<?>> getAcceptedMessages(ClusterDefinition clusterDef)
{
List<Class<?>> messageClasses = new ArrayList<Class<?>>();
Object prototype = clusterDef.getMessageProcessorPrototype();
View
16 lib-dempsyimpl/src/main/java/com/nokia/dempsy/container/MpContainer.java
@@ -385,7 +385,21 @@ public void evict() {
wrapper.markPassivated();
instances.remove(key);
}
- } catch (Throwable e) {
+ }
+ catch (InvocationTargetException e)
+ {
+ logger.warn("Checking the eviction status/passivating of the Mp " + SafeString.objectDescription(instance) +
+ " resulted in an exception.",e.getCause());
+ }
+ catch (IllegalAccessException e)
+ {
+ logger.warn("It appears that the method for checking the eviction or passivating the Mp " + SafeString.objectDescription(instance) +
+ " is not defined correctly. Is it visible?",e);
+ }
+ catch (RuntimeException e)
+ {
+ logger.warn("Checking the eviction status/passivating of the Mp " + SafeString.objectDescription(instance) +
+ " resulted in an exception.",e);
}
}
} finally {
View
2  lib-dempsyimpl/src/main/java/com/nokia/dempsy/messagetransport/tcp/TcpDestination.java
@@ -47,6 +47,8 @@ public String toString()
@Override
public boolean equals(Object other)
{
+ if (other == null)
+ return false;
TcpDestination otherTcpDestination = (TcpDestination)other;
return inetAddress.equals(otherTcpDestination.inetAddress ) && (port == otherTcpDestination.port);
}
View
41 ...psyimpl/src/main/java/com/nokia/dempsy/mpcluster/invm/LocalVmMpClusterSessionFactory.java
@@ -239,7 +239,46 @@ public void setClusterData(T data)
} // end cluster definition
- private final void callUpdateWatchersForCluster(ClusterId clusterId) { updateClusterWatchers(clusterId); }
+ private volatile boolean inProcess = false;
+ private volatile boolean recursionAttempt = false;
+
+ private final void callUpdateWatchersForCluster(ClusterId clusterId)
+ {
+ // this needs to avoid recursion but if there is a recursion attempt it
+ // needs to execute another process call at the end.
+ LocalVmMpSession.LocalVmMpCluster cluster = (LocalVmMpSession.LocalVmMpCluster)cache.get(clusterId);
+ if (cluster != null)
+ {
+ synchronized(cluster.processLock)
+ {
+ if (inProcess)
+ {
+ recursionAttempt = true;
+ return;
+ }
+
+ do
+ {
+ recursionAttempt = false;
+ inProcess = true;
+
+ for(MpClusterWatcher watcher: cluster.watchers)
+ {
+ try
+ {
+ watcher.process();
+ }
+ catch (RuntimeException e)
+ {
+ logger.error("Failed to handle process for watcher " + SafeString.objectDescription(watcher),e);
+ }
+ }
+ } while (recursionAttempt);
+
+ inProcess = false;
+ }
+ }
+ }
private final void callUpdateWatchersForApplication(String applicationId)
{
View
228 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/zookeeper/ZookeeperSession.java
@@ -64,6 +64,7 @@
private Logger logger = LoggerFactory.getLogger(ZookeeperSession.class);
protected volatile AtomicReference<ZooKeeper> zkref;
+ private volatile boolean isRunning = true;
private Map<ClusterId, ZookeeperCluster> cachedClusters = new HashMap<ClusterId, ZookeeperCluster>();
private Map<String, ZookeeperApplication> cachedApps = new HashMap<String, ZookeeperApplication>();
protected long resetDelay = 500;
@@ -137,6 +138,7 @@ public void stop()
AtomicReference<ZooKeeper> curZk;
synchronized(this)
{
+ isRunning = false;
curZk = zkref;
zkref = null; // this blows up any more usage
}
@@ -154,22 +156,27 @@ public void stop()
private List<String> getChildren(ZookeeperPath path, Watcher watcher) throws MpClusterException
{
- ZooKeeper cur = zkref.get();
- try
+ if (isRunning)
{
- return cur.getChildren(path.path, watcher);
- }
- catch (KeeperException e)
- {
- resetZookeeper(cur);
- throw new MpClusterException("Failed to get active slots (" + path +
- ") on provided zookeeper instance.",e);
- }
- catch (InterruptedException e)
- {
- throw new MpClusterException("Failed to get active slots (" + path +
- ") on provided zookeeper instance.",e);
+ ZooKeeper cur = zkref.get();
+ try
+ {
+ return cur.getChildren(path.path, watcher);
+ }
+ catch (KeeperException e)
+ {
+ resetZookeeper(cur);
+ throw new MpClusterException("Failed to get active slots (" + path +
+ ") on provided zookeeper instance.",e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new MpClusterException("Failed to get active slots (" + path +
+ ") on provided zookeeper instance.",e);
+ }
}
+ throw new MpClusterException("getChildren called on stopped MpCluster (" + path +
+ ") on provided zookeeper instance.");
}
private synchronized void setNewZookeeper(ZooKeeper newZk)
@@ -177,7 +184,7 @@ private synchronized void setNewZookeeper(ZooKeeper newZk)
if (logger.isTraceEnabled())
logger.trace("reestablished connection to " + connectString);
- if (zkref != null)
+ if (isRunning)
{
ZooKeeper last = zkref.getAndSet(newZk);
if (last != null)
@@ -221,14 +228,14 @@ public void run()
}
finally
{
- if (newZk == null && zkref != null) // if zk is null then we stopped so no point in continuing.
+ if (newZk == null && isRunning)
// reschedule me.
scheduler.schedule(this, resetDelay, TimeUnit.MILLISECONDS);
}
// this is true if the reset worked and we're not in the process
// of shutting down.
- if (newZk != null && zkref != null)
+ if (newZk != null && isRunning)
{
// we want the setNewZookeeper and the clearing of the
// beingReset flag to be atomic so future failures that result
@@ -287,7 +294,7 @@ private boolean mkdir(ZooKeeper cur, String path, Watcher watcher, CreateMode mo
}
catch (KeeperException e)
{
- logger.error("Failed to create the root node (" + path + ") on provided zookeeper instance.",e);
+ logger.warn("Failed to create the root node (" + path + ") on provided zookeeper instance.",e);
resetZookeeper(cur);
}
catch (InterruptedException e)
@@ -301,9 +308,8 @@ private boolean mkdir(ZooKeeper cur, String path, Watcher watcher, CreateMode mo
private void initializeApplication(ZookeeperApplication application)
{
ZooKeeper cur = null;
- try { cur = zkref.get();}
- // this means zk has been set to null which means we're stopping so just allow it to stop
- catch (NullPointerException npe) { }
+ if (isRunning)
+ cur = zkref.get();
if (cur != null)
{
@@ -321,9 +327,8 @@ private void initializeCluster(ZookeeperCluster cluster, boolean forceWatcherCal
}
ZooKeeper cur = null;
- try { cur = zkref.get();}
- // this means zk has been set to null which means we're stopping so just allow it to stop
- catch (NullPointerException npe) { }
+ if (isRunning)
+ cur = zkref.get();
if (cur != null)
{
@@ -388,7 +393,7 @@ public void process(WatchedEvent event)
clearState(event);
- if (zkref != null)
+ if (isRunning)
resetZookeeper(zkref.get());
}
}
@@ -561,51 +566,63 @@ public void setSlotInformation(TS info) throws MpClusterException
private boolean join() throws MpClusterException
{
- ZooKeeper cur = zkref.get();
- try
- {
- cur.create(slotPath.path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- return true;
- }
- catch(KeeperException.NodeExistsException e)
- {
- if(logger.isDebugEnabled())
- logger.debug("Failed to join the cluster " + clusterId +
- ". Couldn't create the node within zookeeper using \"" + slotPath + "\"");
- return false;
- }
- catch(KeeperException e)
+ if (isRunning)
{
- resetZookeeper(cur);
- throw new MpClusterException("Zookeeper failed while trying to join the cluster " + clusterId +
- ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
- }
- catch(InterruptedException e)
- {
- resetZookeeper(cur);
- throw new MpClusterException("Interrupted while trying to join the cluster " + clusterId +
- ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
+ ZooKeeper cur = zkref.get();
+ try
+ {
+ cur.create(slotPath.path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return true;
+ }
+ catch(KeeperException.NodeExistsException e)
+ {
+ if(logger.isDebugEnabled())
+ logger.debug("Failed to join the cluster " + clusterId +
+ ". Couldn't create the node within zookeeper using \"" + slotPath + "\"");
+ return false;
+ }
+ catch(KeeperException e)
+ {
+ resetZookeeper(cur);
+ throw new MpClusterException("Zookeeper failed while trying to join the cluster " + clusterId +
+ ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
+ }
+ catch(InterruptedException e)
+ {
+ resetZookeeper(cur);
+ throw new MpClusterException("Interrupted while trying to join the cluster " + clusterId +
+ ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
+ }
}
+
+ throw new MpClusterException("join called on stopped MpClusterSlot (" + slotPath +
+ ") on provided zookeeper instance.");
}
@Override
public synchronized void leave() throws MpClusterException
{
- try
- {
- zkref.get().delete(slotPath.path,-1);
- }
- catch(KeeperException e)
+ if (isRunning)
{
- throw new MpClusterException("Failed to leave the cluster " + clusterId.getApplicationName() +
- ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
- }
- catch(InterruptedException e)
- {
- throw new MpClusterException("Interrupted while trying to leave the cluster " + clusterId.getApplicationName() +
- ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
+ try
+ {
+ zkref.get().delete(slotPath.path,-1);
+ }
+ catch(KeeperException e)
+ {
+ throw new MpClusterException("Failed to leave the cluster " + clusterId.getApplicationName() +
+ ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
+ }
+ catch(InterruptedException e)
+ {
+ throw new MpClusterException("Interrupted while trying to leave the cluster " + clusterId.getApplicationName() +
+ ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
+ }
}
+ else
+ throw new MpClusterException("leave called on stopped MpClusterSlot (" + slotPath +
+ ") on provided zookeeper instance.");
}
public String toString() { return slotPath.toString(); }
@@ -708,60 +725,67 @@ public void process(WatchedEvent event)
private Object readInfoFromPath(ZookeeperPath path) throws MpClusterException
{
- ObjectInputStream is = null;
- try
+ if (isRunning)
{
- byte[] ret = zkref.get().getData(path.path, true, null);
-
- if (ret != null && ret.length > 0)
+ ObjectInputStream is = null;
+ try
{
- is = new ObjectInputStream(new ByteArrayInputStream(ret));
- return is.readObject();
+ byte[] ret = zkref.get().getData(path.path, true, null);
+
+ if (ret != null && ret.length > 0)
+ {
+ is = new ObjectInputStream(new ByteArrayInputStream(ret));
+ return is.readObject();
+ }
+ return null;
+ }
+ // this is an indication that the node has disappeared since we retrieved
+ // this MpContainerClusterNode
+ catch (KeeperException.NoNodeException e) { return null; }
+ catch (RuntimeException e) { throw e; }
+ catch (Exception e)
+ {
+ throw new MpClusterException("Failed to get node information for (" + path + ").",e);
+ }
+ finally
+ {
+ IOUtils.closeQuietly(is);
}
- return null;
- }
- // this is an indication that the node has disappeared since we retrieved
- // this MpContainerClusterNode
- catch (KeeperException.NoNodeException e) { return null; }
- catch (RuntimeException e) { throw e; }
- catch (Exception e)
- {
- throw new MpClusterException("Failed to get node information for (" + path + ").",e);
- }
- finally
- {
- IOUtils.closeQuietly(is);
}
+ return null;
}
private void setInfoToPath(ZookeeperPath path, Object info) throws MpClusterException
{
- ObjectOutputStream os = null;
- try
+ if (isRunning)
{
- byte[] buf = null;
- if (info != null)
+ ObjectOutputStream os = null;
+ try
{
- // Serialize to a byte array
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- os = new ObjectOutputStream(bos);
- os.writeObject(info);
- os.close(); // flush
+ byte[] buf = null;
+ if (info != null)
+ {
+ // Serialize to a byte array
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ os = new ObjectOutputStream(bos);
+ os.writeObject(info);
+ os.close(); // flush
+
+ // Get the bytes of the serialized object
+ buf = bos.toByteArray();
+ }
- // Get the bytes of the serialized object
- buf = bos.toByteArray();
+ zkref.get().setData(path.path, buf, -1);
+ }
+ catch (RuntimeException e) { throw e;}
+ catch (Exception e)
+ {
+ throw new MpClusterException("Failed to get node information for (" + path + ").",e);
+ }
+ finally
+ {
+ IOUtils.closeQuietly(os);
}
-
- zkref.get().setData(path.path, buf, -1);
- }
- catch (RuntimeException e) { throw e;}
- catch (Exception e)
- {
- throw new MpClusterException("Failed to get node information for (" + path + ").",e);
- }
- finally
- {
- IOUtils.closeQuietly(os);
}
}
View
325 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/DefaultRoutingStrategy.java
@@ -17,23 +17,33 @@
package com.nokia.dempsy.router;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nokia.dempsy.DempsyException;
+import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.internal.util.SafeString;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.mpcluster.MpCluster;
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSlot;
+import com.nokia.dempsy.mpcluster.MpClusterWatcher;
+import com.nokia.dempsy.router.RoutingStrategy.Outbound.Coordinator;
/**
* This Routing Strategy uses the {@link MpCluster} to negotiate with other instances in the
@@ -41,100 +51,278 @@
*/
public class DefaultRoutingStrategy implements RoutingStrategy
{
+ private static final int resetDelay = 500;
+
private static Logger logger = LoggerFactory.getLogger(DefaultRoutingStrategy.class);
private int defaultTotalSlots;
private int defaultNumNodes;
+ /*
+ * These is to control the timing in tests
+ */
+ private static AtomicLong numOutbounds = new AtomicLong(0);
+ private static AtomicLong numOutboundsInitialized = new AtomicLong(0);
+ public static boolean allOutboundsInitialized() { return numOutboundsInitialized.get() == numOutbounds.get(); }
+ public static void resetOutboundsChecking() { numOutbounds = new AtomicLong(0); numOutboundsInitialized = new AtomicLong(0); }
+
public DefaultRoutingStrategy(int defaultTotalSlots, int defaultNumNodes)
{
this.defaultTotalSlots = defaultTotalSlots;
this.defaultNumNodes = defaultNumNodes;
}
- private class Outbound implements RoutingStrategy.Outbound
+ private class Outbound implements RoutingStrategy.Outbound, MpClusterWatcher
{
- private ConcurrentHashMap<Integer, DefaultRouterSlotInfo> destinations = new ConcurrentHashMap<Integer, DefaultRouterSlotInfo>();
- private int totalAddressCounts = -1;
+ private AtomicReference<Destination[]> destinations = new AtomicReference<Destination[]>();
+ private RoutingStrategy.Outbound.Coordinator coordinator;
+ private MpCluster<ClusterInformation, SlotInformation> cluster;
+ private ClusterId clusterId;
+ private Set<Class<?>> messageTypesHandled = null;
+
+ private ScheduledExecutorService scheduler = null;
+
+ private Outbound(RoutingStrategy.Outbound.Coordinator coordinator,
+ MpCluster<ClusterInformation, SlotInformation> cluster)
+ {
+ numOutbounds.incrementAndGet();
+ this.coordinator = coordinator;
+ this.cluster = cluster;
+ this.clusterId = cluster.getClusterId();
+ cluster.addWatcher(this);
+ execSetupDestinations(false);
+ }
+
+ @Override
+ public ClusterId getClusterId() { return clusterId; }
@Override
- public synchronized SlotInformation selectSlotForMessageKey(Object messageKey) throws DempsyException
+ public Destination selectDestinationForMessage(Object messageKey, Object message) throws DempsyException
{
- if (totalAddressCounts < 0)
+ Destination[] destinationArr = destinations.get();
+ if (destinationArr == null)
throw new DempsyException("It appears the Outbound strategy for the message key " +
SafeString.objectDescription(messageKey) +
" is being used prior to initialization.");
- int calculatedModValue = Math.abs(messageKey.hashCode()%totalAddressCounts);
- return destinations.get(calculatedModValue);
+ int calculatedModValue = Math.abs(messageKey.hashCode()%destinationArr.length);
+ return destinationArr[calculatedModValue];
}
- public synchronized void resetCluster(MpCluster<ClusterInformation, SlotInformation> clusterHandle) throws MpClusterException
+ @Override
+ public void process()
{
- if (logger.isTraceEnabled())
- logger.trace("Resetting Outbound Strategy for cluster " + clusterHandle.getClusterId());
-
- destinations.clear();
- int newtotalAddressCounts = fillMapFromActiveSlots(destinations,clusterHandle);
- if (newtotalAddressCounts == 0)
- throw new MpClusterException("The cluster " + clusterHandle.getClusterId() +
- " seems to have invalid slot information. Someone has set the total number of slots to zero.");
- totalAddressCounts = newtotalAddressCounts > 0 ? newtotalAddressCounts : totalAddressCounts;
+ execSetupDestinations(true);
+ }
+
+ @Override
+ public synchronized void stop()
+ {
+ if (scheduler != null)
+ scheduler.shutdown();
+ scheduler = null;
+ }
+
+ /**
+ * This method is protected for testing purposes. Otherwise it would be private.
+ * @return whether or not the setup was successful.
+ */
+ protected synchronized boolean setupDestinations()
+ {
+ try
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Resetting Outbound Strategy for cluster " + clusterId);
+
+ Map<Integer,DefaultRouterSlotInfo> slotNumbersToSlots = new HashMap<Integer,DefaultRouterSlotInfo>();
+ int newtotalAddressCounts = fillMapFromActiveSlots(slotNumbersToSlots,cluster);
+ if (newtotalAddressCounts == 0)
+ throw new MpClusterException("The cluster " + cluster.getClusterId() +
+ " seems to have invalid slot information. Someone has set the total number of slots to zero.");
+ if (newtotalAddressCounts > 0)
+ {
+ Destination[] newDestinations = new Destination[newtotalAddressCounts];
+ for (Map.Entry<Integer,DefaultRouterSlotInfo> entry : slotNumbersToSlots.entrySet())
+ {
+ DefaultRouterSlotInfo slotInfo = entry.getValue();
+ newDestinations[entry.getKey()] = slotInfo.getDestination();
+
+ // only register the very first time ... for now
+ if (messageTypesHandled == null)
+ {
+ messageTypesHandled = new HashSet<Class<?>>();
+ messageTypesHandled.addAll(slotInfo.getMessageClasses());
+ coordinator.registerOutbound(this, messageTypesHandled);
+ }
+ }
+
+ // now see if anything is changed.
+ Destination[] oldDestinations = destinations.get();
+ if (oldDestinations == null || !Arrays.equals(oldDestinations, newDestinations))
+ destinations.set(newDestinations);
+ }
+ else
+ destinations.set(null);
+
+ return destinations.get() != null;
+ }
+ catch(MpClusterException e)
+ {
+ destinations.set(null);
+ logger.warn("Failed to set up the Outbound for " + clusterId, e);
+ }
+ catch (RuntimeException rte)
+ {
+ logger.error("Failed to set up the Outbound for " + clusterId, rte);
+ }
+ return false;
+ }
+
+ private void execSetupDestinations(final boolean fromProcess)
+ {
+ if (!setupDestinations())
+ {
+ scheduler = Executors.newScheduledThreadPool(1);
+
+ scheduler.schedule(new Runnable(){
+ @Override
+ public void run()
+ {
+ if (!setupDestinations())
+ {
+ synchronized(Outbound.this)
+ {
+ if (scheduler != null)
+ scheduler.schedule(this, resetDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+ else
+ {
+ // at this point the initialize has succeeded
+ if (!fromProcess)
+ numOutboundsInitialized.incrementAndGet();
+
+ synchronized(Outbound.this)
+ {
+ if (scheduler != null)
+ {
+ scheduler.shutdown();
+ scheduler = null;
+ }
+ }
+ }
+ }
+ }, resetDelay, TimeUnit.MILLISECONDS);
+ }
+ else
+ // at this point the initialize has succeeded
+ if (!fromProcess)
+ numOutboundsInitialized.incrementAndGet();
}
+
+
} // end Outbound class definition
- private class Inbound implements RoutingStrategy.Inbound
+ private class Inbound implements RoutingStrategy.Inbound, MpClusterWatcher
{
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
private List<Integer> destinationsAcquired = new ArrayList<Integer>();
+ private MpCluster<ClusterInformation, SlotInformation> cluster;
+ private Collection<Class<?>> messageTypes;
+ private Destination thisDestination;
+ private ClusterId clusterId;
+
+ private Inbound(MpCluster<ClusterInformation, SlotInformation> cluster,
+ Collection<Class<?>> messageTypes,
+ Destination thisDestination)
+ {
+ this.cluster = cluster;
+ this.messageTypes = messageTypes;
+ this.thisDestination = thisDestination;
+ this.clusterId = cluster.getClusterId();
+ this.cluster.addWatcher(this);
+ acquireSlots(false);
+ }
@Override
- public synchronized void resetCluster(MpCluster<ClusterInformation, SlotInformation> clusterHandle,
- List<Class<?>> messagesTypes, Destination destination) throws MpClusterException
+ public void process()
{
- if (logger.isTraceEnabled())
- logger.trace("Resetting Inbound Strategy for cluster " + clusterHandle.getClusterId());
-
- int minNodeCount = defaultNumNodes;
- int totalAddressNeeded = defaultTotalSlots;
- Random random = new Random();
-
- //==============================================================================
- // need to verify that the existing slots in destinationsAcquired are still ours
- Map<Integer,DefaultRouterSlotInfo> slotNumbersToSlots = new HashMap<Integer,DefaultRouterSlotInfo>();
- fillMapFromActiveSlots(slotNumbersToSlots,clusterHandle);
- Collection<Integer> slotsToReaquire = new ArrayList<Integer>();
- for (Integer destinationSlot : destinationsAcquired)
- {
- // select the coresponding slot information
- DefaultRouterSlotInfo slotInfo = slotNumbersToSlots.get(destinationSlot);
- if (slotInfo == null || !destination.equals(slotInfo.getDestination()))
- slotsToReaquire.add(destinationSlot);
- }
- //==============================================================================
+ acquireSlots(true);
+ }
+
+ private synchronized void acquireSlots(final boolean fromProcess)
+ {
+ boolean retry = true;
- //==============================================================================
- // Now reaquire the potentially lost slots
- for (Integer slotToReaquire : slotsToReaquire)
+ try
{
- if (!acquireSlot(slotToReaquire, totalAddressNeeded,
- clusterHandle, messagesTypes, destination))
+ if (logger.isTraceEnabled())
+ logger.trace("Resetting Inbound Strategy for cluster " + clusterId);
+
+ int minNodeCount = defaultNumNodes;
+ int totalAddressNeeded = defaultTotalSlots;
+ Random random = new Random();
+
+ //==============================================================================
+ // need to verify that the existing slots in destinationsAcquired are still ours
+ Map<Integer,DefaultRouterSlotInfo> slotNumbersToSlots = new HashMap<Integer,DefaultRouterSlotInfo>();
+ fillMapFromActiveSlots(slotNumbersToSlots,cluster);
+ Collection<Integer> slotsToReaquire = new ArrayList<Integer>();
+ for (Integer destinationSlot : destinationsAcquired)
{
- // in this case, see if I already own it...
- logger.error("Cannot reaquire the slot " + slotToReaquire + " for the cluster " + clusterHandle.getClusterId());
+ // select the coresponding slot information
+ DefaultRouterSlotInfo slotInfo = slotNumbersToSlots.get(destinationSlot);
+ if (slotInfo == null || !thisDestination.equals(slotInfo.getDestination()))
+ slotsToReaquire.add(destinationSlot);
}
- }
- //==============================================================================
+ //==============================================================================
- while(needToGrabMoreSlots(clusterHandle,minNodeCount,totalAddressNeeded))
+ //==============================================================================
+ // Now re-acquire the potentially lost slots
+ for (Integer slotToReaquire : slotsToReaquire)
+ {
+ if (!acquireSlot(slotToReaquire, totalAddressNeeded,
+ cluster, messageTypes, thisDestination))
+ {
+ // in this case, see if I already own it...
+ logger.warn("Cannot reaquire the slot " + slotToReaquire + " for the cluster " + clusterId);
+ }
+ }
+ //==============================================================================
+
+ while(needToGrabMoreSlots(cluster,minNodeCount,totalAddressNeeded))
+ {
+ int randomValue = random.nextInt(totalAddressNeeded);
+ if(destinationsAcquired.contains(randomValue))
+ continue;
+ if (acquireSlot(randomValue, totalAddressNeeded,
+ cluster, messageTypes, thisDestination))
+ destinationsAcquired.add(randomValue);
+ }
+
+ retry = false;
+ }
+ catch(MpClusterException e)
{
- int randomValue = random.nextInt(totalAddressNeeded);
- if(destinationsAcquired.contains(randomValue))
- continue;
- if (acquireSlot(randomValue, totalAddressNeeded,
- clusterHandle, messagesTypes, destination))
- destinationsAcquired.add(randomValue);
+ destinationsAcquired.clear();
+ }
+ finally
+ {
+ // if we never got the destinations set up then kick off a retry
+ if (retry)
+ scheduler.schedule(new Runnable(){
+ @Override
+ public void run() { acquireSlots(fromProcess); }
+ }, resetDelay, TimeUnit.MILLISECONDS);
}
}
+ @Override
+ public void stop()
+ {
+ scheduler.shutdown();
+ }
+
private boolean needToGrabMoreSlots(MpCluster<ClusterInformation, SlotInformation> clusterHandle,
int minNodeCount, int totalAddressNeeded) throws MpClusterException
{
@@ -151,9 +339,19 @@ public boolean doesMessageKeyBelongToNode(Object messageKey)
} // end Inbound class definition
- public RoutingStrategy.Inbound createInbound() { return new Inbound(); }
-
- public RoutingStrategy.Outbound createOutbound() { return new Outbound(); }
+ @Override
+ public RoutingStrategy.Inbound createInbound(MpCluster<ClusterInformation, SlotInformation> cluster,
+ Collection<Class<?>> messageTypes,
+ Destination thisDestination)
+ {
+ return new Inbound(cluster,messageTypes,thisDestination);
+ }
+
+ @Override
+ public RoutingStrategy.Outbound createOutbound(Coordinator coordinator, MpCluster<ClusterInformation, SlotInformation> cluster)
+ {
+ return new Outbound(coordinator,cluster);
+ }
static class DefaultRouterSlotInfo extends SlotInformation
{
@@ -225,6 +423,11 @@ private static int fillMapFromActiveSlots(Map<Integer,DefaultRouterSlotInfo> map
if(slotsFromClusterManager != null)
{
+ // zero is valid but we only want to set it if we are not
+ // going to enter into the loop below.
+ if (slotsFromClusterManager.size() == 0)
+ totalAddressCounts = 0;
+
for(MpClusterSlot<SlotInformation> node: slotsFromClusterManager)
{
DefaultRouterSlotInfo slotInfo = (DefaultRouterSlotInfo)node.getSlotInformation();
@@ -247,7 +450,7 @@ else if (totalAddressCounts != slotInfo.getTotalAddress())
private static boolean acquireSlot(int slotNum, int totalAddressNeeded,
MpCluster<ClusterInformation, SlotInformation> clusterHandle,
- List<Class<?>> messagesTypes, Destination destination) throws MpClusterException
+ Collection<Class<?>> messagesTypes, Destination destination) throws MpClusterException
{
MpClusterSlot<SlotInformation> slot = clusterHandle.join(String.valueOf(slotNum));
if(slot == null)
View
223 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/Router.java
@@ -34,9 +34,9 @@
import com.nokia.dempsy.Adaptor;
import com.nokia.dempsy.Dempsy;
+import com.nokia.dempsy.Dempsy.Application.Cluster.Node;
import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.Dispatcher;
-import com.nokia.dempsy.Dempsy.Application.Cluster.Node;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.annotations.MessageProcessor;
import com.nokia.dempsy.config.ApplicationDefinition;
@@ -54,8 +54,7 @@
import com.nokia.dempsy.mpcluster.MpCluster;
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSession;
-import com.nokia.dempsy.mpcluster.MpClusterSlot;
-import com.nokia.dempsy.mpcluster.MpClusterWatcher;
+import com.nokia.dempsy.router.RoutingStrategy.Outbound;
import com.nokia.dempsy.serialization.SerializationException;
import com.nokia.dempsy.serialization.Serializer;
@@ -87,7 +86,7 @@
*
* <p>A router requires a non-null ApplicationDefinition during construction.</p>
*/
-public class Router implements Dispatcher
+public class Router implements Dispatcher, RoutingStrategy.Outbound.Coordinator
{
private static Logger logger = LoggerFactory.getLogger(Router.class);
@@ -95,9 +94,11 @@
private ApplicationDefinition applicationDefinition = null;
private ConcurrentHashMap<Class<?>, Set<ClusterRouter>> routerMap = new ConcurrentHashMap<Class<?>, Set<ClusterRouter>>();
-
// protected for test access
protected ConcurrentHashMap<Class<?>, Object> missingMsgTypes = new ConcurrentHashMap<Class<?>, Object>();
+
+ private Set<RoutingStrategy.Outbound> outbounds = new HashSet<RoutingStrategy.Outbound>();
+
private MpClusterSession<ClusterInformation, SlotInformation> mpClusterSession = null;
private SenderFactory defaultSenderFactory;
private ClusterId currentCluster = null;
@@ -161,18 +162,32 @@ public void initialize() throws MpClusterException, DempsyException
(currentClusterDef != null && currentClusterDef.hasExplicitDestinations()) ? new HashSet<ClusterId>() : null;
if (explicitClusterDestinations != null)
explicitClusterDestinations.addAll(Arrays.asList(currentClusterDef.getDestinations()));
-
+ //-------------------------------------------------------------------------------------
+ // TODO: This loop will eventually be replaced when the instantiation of the Outbound
+ // is driven from cluster information management events (Zookeeper callbacks).
+ //-------------------------------------------------------------------------------------
// if the currentCluster is set and THAT cluster has explicit destinations
// then those are the only ones we want to consider
for (ClusterDefinition clusterDef : applicationDefinition.getClusterDefinitions())
{
- if (explicitClusterDestinations == null || explicitClusterDestinations.contains(clusterDef.getClusterId()))
+ if ((explicitClusterDestinations == null || explicitClusterDestinations.contains(clusterDef.getClusterId()))
+ && !clusterDef.isRouteAdaptorType())
{
- ClusterRouter router = new ClusterRouter(clusterDef);
- router.setup(false);
+ RoutingStrategy strategy = (RoutingStrategy)clusterDef.getRoutingStrategy();
+ ClusterId clusterId = clusterDef.getClusterId();
+ if (strategy == null)
+ throw new DempsyException("Could not retrieve the routing strategy for " + SafeString.valueOf(clusterId));
+
+ MpCluster<ClusterInformation, SlotInformation> cluster = mpClusterSession.getCluster(clusterId);
+
+ // This create will result in a callback on the Router as the Outbound.Coordinator with a
+ // registration event. The Outbound may (will) call back on the Router to retrieve the
+ // MpClusterSession and register itself with the appropriate cluster.
+ outbounds.add(strategy.createOutbound(this, cluster));
}
}
+ //-------------------------------------------------------------------------------------
}
/**
@@ -266,102 +281,114 @@ public void stop()
routers.addAll(curRouters);
for (ClusterRouter router : routers)
router.stop();
+ for (RoutingStrategy.Outbound ob : outbounds)
+ ob.stop();
+ }
+
+ @Override
+ public void registerOutbound(RoutingStrategy.Outbound outbound, Collection<Class<?>> classes)
+ {
+ synchronized(outbound)
+ {
+ unregisterOutbound(outbound);
+
+ ClusterId clusterId = outbound.getClusterId();
+ if (classes != null && classes.size() > 0)
+ {
+ // find the appropriate ClusterDefinition
+ ClusterDefinition curClusterDef = applicationDefinition.getClusterDefinition(clusterId);
+
+ if (curClusterDef != null)
+ {
+ // create a corresponding ClusterRouter
+ @SuppressWarnings("unchecked")
+ ClusterRouter clusterRouter = new ClusterRouter((Serializer<Object>)curClusterDef.getSerializer(),outbound);
+
+ for (Class<?> clazz : classes)
+ {
+ Set<ClusterRouter> cur = Collections.newSetFromMap(new ConcurrentHashMap<ClusterRouter, Boolean>()); // potential
+ Set<ClusterRouter> tmp = routerMap.putIfAbsent(clazz, cur);
+ if (tmp != null)
+ cur = tmp;
+ cur.add(clusterRouter);
+ }
+ }
+ else
+ {
+ logger.error("Couldn't find the ClusterDefinition for " + clusterId + " while registering the Outbound " +
+ SafeString.objectDescription(outbound) + " given the ApplicationDefinition " + applicationDefinition);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unregisterOutbound(RoutingStrategy.Outbound outbound)
+ {
+ // we don't want to register and unregister the same Outbound at the same time
+ // but we can handle registering and unregistering different Outbound's
+ synchronized(outbound)
+ {
+ for (Map.Entry<Class<?>,Set<ClusterRouter>> entry : routerMap.entrySet())
+ {
+ Set<ClusterRouter> crs = entry.getValue();
+ for (Iterator<ClusterRouter> iter = crs.iterator(); iter.hasNext(); )
+ {
+ ClusterRouter cur = iter.next();
+ if (cur.strategyOutbound == outbound)
+ iter.remove();
+ }
+ // we're not going to remove a potentially empty set, or purpose.
+ }
+ }
}
/**
* This class routes messages within a particular cluster. It is protected for test
* access only. Otherwise it would be private.
*/
- protected class ClusterRouter implements MpClusterWatcher
+ protected class ClusterRouter
{
private Serializer<Object> serializer;
- private ClusterId clusterId;
- private MpCluster<ClusterInformation,SlotInformation> clusterHandle;
private SenderFactory senderFactory = defaultSenderFactory;
- private volatile boolean isSetup = false;
- private RoutingStrategy strategy;
private RoutingStrategy.Outbound strategyOutbound;
- @SuppressWarnings("unchecked")
- private ClusterRouter(ClusterDefinition clusterDef) throws MpClusterException, DempsyException
+ private ClusterRouter(Serializer<Object> serializer, Outbound strategyOutbound)
{
- this.clusterId = new ClusterId(clusterDef.getClusterId());
-
- Object clusterRs = clusterDef.getRoutingStrategy();
- if (clusterRs == null)
- throw new DempsyException("Could not retrieve the routing strategy for " + SafeString.valueOf(clusterId));
- strategy = (RoutingStrategy)clusterRs;
-
- strategyOutbound = strategy.createOutbound();
-
- serializer = (Serializer<Object>)clusterDef.getSerializer();
- if (serializer == null)
- throw new DempsyException("Could not retrieve the serializer for " + SafeString.valueOf(clusterId));
+ this.strategyOutbound = strategyOutbound;
+ this.serializer = serializer;
}
- @Override
- public void process()
- {
- // it appears that the cluster configuration has changed.
- // we need to reset up the distributor
- try
- {
- setup(true);
- }
- catch(MpClusterException e)
- {
- logger.error("Major problem with the Router. Can't respond to changes in the cluster information:", e);
- }
- }
-
public void route(Object key, Object message)
{
- SlotInformation target = null;
boolean messageFailed = true;
Sender sender = null;
try
{
- target = strategyOutbound.selectSlotForMessageKey(key);
- if(target == null)
- {
- setup(false);
- target = strategyOutbound.selectSlotForMessageKey(key);
- }
+ Destination destination = strategyOutbound.selectDestinationForMessage(key, message);
- if(target != null)
+ if (destination == null)
{
- Destination destination = target.getDestination();
-
- if (destination == null)
- {
- logger.error("Couldn't find a destination for " + SafeString.objectDescription(message) +
- " from the cluster slot information selected " + SafeString.objectDescription(target));
- return;
- }
-
- sender = senderFactory.getSender(destination);
- if (sender == null)
- logger.error("Couldn't figure out a means to send " + SafeString.objectDescription(message) +
- " to " + SafeString.valueOf(destination) + "");
- else
- {
- byte[] data = serializer.serialize(message);
- sender.send(data);
- messageFailed = false;
- if (statsCollector != null) statsCollector.messageSent(message);
- }
-
+ logger.error("Couldn't find a destination for " + SafeString.objectDescription(message));
+ return;
}
+
+ sender = senderFactory.getSender(destination);
+ if (sender == null)
+ logger.error("Couldn't figure out a means to send " + SafeString.objectDescription(message) +
+ " to " + SafeString.valueOf(destination) + "");
else
{
- logger.warn("No destination found for the message " + SafeString.objectDescription(message) +
- " with the key " + SafeString.objectDescription(key));
+ byte[] data = serializer.serialize(message);
+ sender.send(data);
+ messageFailed = false;
+ if (statsCollector != null) statsCollector.messageSent(message);
}
}
catch(DempsyException e)
{
logger.error("Failed to determine the destination for " + SafeString.objectDescription(message) +
- " using the routing strategy " + SafeString.objectDescription(strategy),e);
+ " using the routing strategy " + SafeString.objectDescription(strategyOutbound),e);
}
catch (SerializationException e)
{
@@ -386,52 +413,6 @@ public void route(Object key, Object message)
}
}
- public void setup(boolean force) throws MpClusterException
- {
- if (!isSetup || force)
- {
- synchronized(this)
- {
- if (!isSetup || force) // double checked locking
- {
- isSetup = false;
- clusterHandle = mpClusterSession.getCluster(clusterId);
- clusterHandle.addWatcher(this);
- strategyOutbound.resetCluster(clusterHandle);
-
- Set<Class<?>> messageClasses = new HashSet<Class<?>>();
-
- Collection<MpClusterSlot<SlotInformation>> nodes = clusterHandle.getActiveSlots();
- if(nodes != null)
- {
- Set<Class<?>> msgClass = null;
- for(MpClusterSlot<SlotInformation> node: nodes)
- {
- SlotInformation slotInfo = node.getSlotInformation();
- if(slotInfo != null)
- {
- msgClass = slotInfo.getMessageClasses();
- if (msgClass != null)
- messageClasses.addAll(msgClass);
- }
- }
-
- // now we may have new messageClasses so we need to register with the Router
- for (Class<?> clazz : messageClasses)
- {
- Set<ClusterRouter> cur = Collections.newSetFromMap(new ConcurrentHashMap<ClusterRouter, Boolean>()); // potential
- Set<ClusterRouter> tmp = routerMap.putIfAbsent(clazz, cur);
- if (tmp != null)
- cur = tmp;
- cur.add(this);
- }
-
- }
- }
- }
- }
- }
-
private void stop()
{
try { if (senderFactory != null) senderFactory.stop(); }
View
46 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestDempsy.java
@@ -16,6 +16,7 @@
package com.nokia.dempsy;
+import static com.nokia.dempsy.TestUtils.poll;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -44,6 +45,7 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.nokia.dempsy.Dempsy.Application.Cluster.Node;
+import com.nokia.dempsy.TestUtils.Condition;
import com.nokia.dempsy.annotations.MessageHandler;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.annotations.MessageProcessor;
@@ -51,6 +53,7 @@
import com.nokia.dempsy.annotations.Start;
import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.mpcluster.zookeeper.ZookeeperTestServer.InitZookeeperServerBean;
+import com.nokia.dempsy.router.DefaultRoutingStrategy;
public class TestDempsy
{
@@ -58,7 +61,7 @@
private static long baseTimeoutMillis = 20000; // 20 seconds
- String[] dempsyConfigs = new String[] { "testDempsy/Dempsy.xml" /*, "testDempsy/Dempsy-MultiThreadedStartup.xml"*/ };
+ String[] dempsyConfigs = new String[] { "testDempsy/Dempsy.xml" };
String[] clusterManagers = new String[]{ "testDempsy/ClusterManager-ZookeeperActx.xml", "testDempsy/ClusterManager-LocalVmActx.xml" };
String[] transports = new String[]
@@ -244,6 +247,8 @@ public void runAllCombinations(String applicationContext, Checker checker) throw
logger.debug(" test: " + (checker == null ? "none" : checker) + " using " + dempsyConfig + "," + clusterManager + "," + transport);
logger.debug("*****************************************************************");
+ DefaultRoutingStrategy.resetOutboundsChecking();
+
String[] ctx = new String[4];
ctx[0] = dempsyConfig;
ctx[1] = clusterManager;
@@ -253,8 +258,10 @@ public void runAllCombinations(String applicationContext, Checker checker) throw
logger.debug("Starting up the appliction context ...");
ClassPathXmlApplicationContext actx = new ClassPathXmlApplicationContext(ctx);
actx.registerShutdownHook();
-
+
Dempsy dempsy = (Dempsy)actx.getBean("dempsy");
+
+ assertTrue(TestUtils.waitForClustersToBeInitialized(baseTimeoutMillis, 20, dempsy));
WaitForShutdown waitingForShutdown = new WaitForShutdown(dempsy);
Thread waitingForShutdownThread = new Thread(waitingForShutdown,"Waiting For Shutdown");
@@ -323,15 +330,12 @@ public void testIndividualClusterStart() throws Throwable
@Test(expected=BeanCreationException.class)
public void testInValidClusterStart() throws Throwable
{
- ClassPathXmlApplicationContext actx = new ClassPathXmlApplicationContext(
+ new ClassPathXmlApplicationContext(
"testDempsy/Dempsy-InValidClusterStart.xml",
"testDempsy/Transport-PassthroughActx.xml",
"testDempsy/ClusterManager-LocalVmActx.xml",
"testDempsy/SimpleMultistageApplicationActx.xml"
);
- actx.registerShutdownHook();
- actx.stop();
- actx.destroy();
}
@Test
@@ -401,11 +405,8 @@ public void check(ApplicationContext context) throws Throwable
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && !message.equals(mp.lastReceived.get());)
- Thread.sleep(1);
- assertEquals(message,mp.lastReceived.get());
-
+ final Object msg = message;
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return msg.equals(mp.lastReceived.get()); } }));
// verify we haven't called it again, not that there's really
// a way to given the code
@@ -445,10 +446,8 @@ public void check(ApplicationContext context) throws Throwable
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && !message.equals(mp.lastReceived.get());)
- Thread.sleep(1);
- assertEquals(message,mp.lastReceived.get());
+ final Object msg = message;
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return msg.equals(mp.lastReceived.get()); } }));
assertEquals(adaptor2.lastSent,message);
assertEquals(adaptor2.lastSent,mp.lastReceived.get());
@@ -559,31 +558,20 @@ public void check(ApplicationContext context) throws Throwable
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && mp.cloneCalls.get()<2;)
- Thread.sleep(1);
-
- assertEquals(2, mp.cloneCalls.get());
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return mp.cloneCalls.get()==2; } }));
TestAdaptor adaptor = (TestAdaptor)context.getBean("adaptor");
adaptor.pushMessage(new TestMessage("output")); // this causes the container to clone the Mp
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && mp.cloneCalls.get()<3;)
- Thread.sleep(1);
-
- assertEquals(3, mp.cloneCalls.get());
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return mp.cloneCalls.get()==3; } }));
adaptor.pushMessage(new TestMessage("test1")); // this causes the container to clone the Mp
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && mp.cloneCalls.get()<3;)
- Thread.sleep(1);
- assertEquals(3, mp.cloneCalls.get());
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return mp.cloneCalls.get()==3; } }));
List<Node> nodes = dempsy.getCluster(new ClusterId("test-app","test-cluster1")).getNodes();
Assert.assertNotNull(nodes);
Assert.assertTrue(nodes.size()>0);
View
82 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestUtils.java
@@ -0,0 +1,82 @@
+package com.nokia.dempsy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Ignore;
+
+import com.nokia.dempsy.config.ClusterId;
+import com.nokia.dempsy.mpcluster.MpCluster;
+import com.nokia.dempsy.mpcluster.MpClusterException;
+import com.nokia.dempsy.mpcluster.MpClusterSession;
+import com.nokia.dempsy.mpcluster.MpClusterSlot;
+import com.nokia.dempsy.router.ClusterInformation;
+import com.nokia.dempsy.router.DefaultRoutingStrategy;
+import com.nokia.dempsy.router.SlotInformation;
+
+@Ignore
+public class TestUtils
+{
+ public static interface Condition<T>
+ {
+ public boolean conditionMet(T o);
+ }
+
+ public static <T> boolean poll(long timeoutMillis, T userObject, Condition<T> condition) throws InterruptedException
+ {
+ for (long endTime = System.currentTimeMillis() + timeoutMillis;
+ endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
+ Thread.sleep(1);
+ return condition.conditionMet(userObject);
+ }
+
+ public static boolean waitForClustersToBeInitialized(long timeoutMillis,
+ final int numSlotsPerCluster, Dempsy dempsy) throws InterruptedException
+ {
+ try
+ {
+ final List<ClusterId> clusters = new ArrayList<ClusterId>();
+
+ // find out all of the ClusterIds
+ for (Dempsy.Application app : dempsy.applications)
+ {
+ for (Dempsy.Application.Cluster cluster : app.appClusters)
+ if (!cluster.clusterDefinition.isRouteAdaptorType())
+ clusters.add(new ClusterId(cluster.clusterDefinition.getClusterId()));
+ }
+
+ MpClusterSession<ClusterInformation, SlotInformation> session = dempsy.clusterSessionFactory.createSession();
+ boolean ret = poll(timeoutMillis, session, new Condition<MpClusterSession<ClusterInformation, SlotInformation>>()
+ {
+ @Override
+ public boolean conditionMet(MpClusterSession<ClusterInformation, SlotInformation> session)
+ {
+ try
+ {
+ for (ClusterId c : clusters)
+ {
+ MpCluster<ClusterInformation, SlotInformation> cluster = session.getCluster(c);
+ Collection<MpClusterSlot<SlotInformation>> slots = cluster.getActiveSlots();
+ if (slots == null || slots.size() != numSlotsPerCluster)
+ return false;
+ }
+ }
+ catch(MpClusterException e)
+ {
+ return false;
+ }
+
+ return DefaultRoutingStrategy.allOutboundsInitialized();
+ }
+ });
+
+ session.stop();
+ return ret;
+ }
+ catch (MpClusterException e)
+ {
+ return false;
+ }
+ }
+}
View
7 lib-dempsyimpl/src/test/java/com/nokia/dempsy/container/TestMpContainer.java
@@ -59,6 +59,7 @@
private Serializer<Object> serializer = new JavaSerializer<Object>();
private ClassPathXmlApplicationContext context;
+ private long baseTimeoutMillis = 2000;
public static class DummyDispatcher implements Dispatcher
{
@@ -243,7 +244,7 @@ public void testOutputInvoker() throws Exception {
@Test
public void testEvictable() throws Exception {
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
- outputQueue.poll(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(outputQueue.poll(baseTimeoutMillis, TimeUnit.MILLISECONDS));
assertEquals("did not create MP", 1, container.getProcessorCount());
@@ -253,7 +254,7 @@ public void testEvictable() throws Exception {
assertEquals("invocation count, 1st message", 1, mp.invocationCount);
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
- outputQueue.poll(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(outputQueue.poll(baseTimeoutMillis, TimeUnit.MILLISECONDS));
assertEquals("activation count, 2nd message", 1, mp.activationCount);
assertEquals("invocation count, 2nd message", 2, mp.invocationCount);
@@ -262,7 +263,7 @@ public void testEvictable() throws Exception {
mp.evict = true;
container.evict();
inputQueue.add(serializer.serialize(new ContainerTestMessage("foo")));
- outputQueue.poll(1000, TimeUnit.MILLISECONDS);
+ assertNotNull(outputQueue.poll(baseTimeoutMillis, TimeUnit.MILLISECONDS));
assertEquals("Clone count, 2nd message", tmpCloneCount+1, TestProcessor.cloneCount);
}
View
15 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/TestAllMpClusterImpls.java
@@ -21,6 +21,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static com.nokia.dempsy.TestUtils.*;
+
import java.io.IOException;
import java.util.ArrayList;
@@ -92,19 +94,6 @@ public static void shutdownZookeeper()
}
}
- private interface Condition<T>
- {
- public boolean conditionMet(T o);
- }
-
- public static <T> boolean poll(long timeoutMillis, T userObject, Condition<T> condition) throws InterruptedException
- {
- for (long endTime = System.currentTimeMillis() + timeoutMillis;
- endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
- Thread.sleep(1);
- return condition.conditionMet(userObject);
- }
-
@Test
public void testMpClusterFromFactory() throws Throwable
{
View
35 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestFullApp.java
@@ -16,6 +16,7 @@
package com.nokia.dempsy.mpcluster.zookeeper;
+import static com.nokia.dempsy.TestUtils.poll;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -36,6 +37,7 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.nokia.dempsy.Dempsy;
+import com.nokia.dempsy.TestUtils.Condition;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;
import com.nokia.dempsy.config.ClusterId;
@@ -57,11 +59,6 @@
private static final String transport = "testDempsy/Transport-TcpActx.xml";
private static final long baseTimeoutMillis = 10000;
- private interface Condition
- {
- public boolean conditionMet(Object o);
- }
-
private static String[] ctx = new String[4];
static {
ctx[0] = dempsyConfig;
@@ -88,14 +85,6 @@ public void shutdownZookeeper()
zkServer.stop();
}
- public static boolean poll(long timeoutMillis, Object userObject, Condition condition) throws InterruptedException
- {
- for (long endTime = System.currentTimeMillis() + timeoutMillis;
- endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
- Thread.sleep(1);
- return condition.conditionMet(userObject);
- }
-
@Test
public void testStartStop() throws Throwable
{
@@ -114,7 +103,7 @@ public void testStartStop() throws Throwable
final FullApplication app = (FullApplication)actx.getBean("app");
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -182,7 +171,7 @@ public void testStartForceMpDisconnectStop() throws Throwable
final StatsCollector collector = node.getStatsCollector();
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -226,7 +215,7 @@ public boolean conditionMet(Object o)
final long interimMessageCount = prototype.myMpReceived.get();
// and now we should eventually get more as the session recovers.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -297,7 +286,7 @@ public void testStartForceMpDisconnectWithStandby() throws Throwable
cluster.instantiateAndStartAnotherNodeForTesting(); // the code for start instantiates a new node
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -342,7 +331,7 @@ public boolean conditionMet(Object o)
final long interimMessageCount = prototype.myMpReceived.get();
// and now we should eventually get more as the session recovers.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -423,7 +412,7 @@ public void testSeparateClustersInOneVm() throws Throwable
final FullApplication app = (FullApplication)actx.getBean("app");
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{