From da29bc98c5c49b0c34fa238cfdb6a578e0722450 Mon Sep 17 00:00:00 2001 From: Paul Dudenkov Date: Tue, 30 Aug 2016 18:25:59 +0300 Subject: [PATCH] KAA-1281: kaa node blocks startup if zookeeper is unavailable --- .../server/common/zk/ControlNodeTracker.java | 44 +--- .../server/common/zk/WorkerNodeTracker.java | 28 +-- .../common/zk/bootstrap/BootstrapNode.java | 18 +- .../server/common/zk/control/ControlNode.java | 23 +- .../common/zk/operations/OperationsNode.java | 21 +- .../kaa/server/common/zk/BootstrapNodeIT.java | 144 ++++++----- .../kaa/server/common/zk/ControlNodeIT.java | 178 +++++++------- .../server/common/zk/OperationsNodeIT.java | 225 +++++++++--------- .../BootstrapInitializationService.java | 7 +- .../control/service/zk/ControlZkService.java | 8 +- .../KaaNodeInitializationService.java | 23 +- .../OperationsInitializationService.java | 8 +- .../src/main/resources/kaaNodeContext.xml | 14 ++ .../kaa/server/control/TestCluster.java | 9 +- .../event/EventServiceThriftTestIT.java | 5 +- .../operations/service/event/TestCluster.java | 8 +- 16 files changed, 411 insertions(+), 352 deletions(-) diff --git a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/ControlNodeTracker.java b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/ControlNodeTracker.java index 1d47089c9c..862e1af86f 100644 --- a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/ControlNodeTracker.java +++ b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/ControlNodeTracker.java @@ -21,9 +21,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; @@ -56,7 +54,7 @@ public abstract class ControlNodeTracker implements ControlNodeAware, Closeable .getLogger(ControlNodeTracker.class); /** The client. */ - protected CuratorFramework client; + protected CuratorFramework zkClient; /** The control cache. */ private NodeCache controlCache; @@ -107,34 +105,9 @@ public void unhandledError(String message, Throwable e) { /** * Instantiates a new control node tracker. * - * @param zkHostPortList - * the zk host port list - * @param retryPolicy - * the retry policy */ - public ControlNodeTracker(String zkHostPortList, RetryPolicy retryPolicy) { - this(zkHostPortList, -1, -1, retryPolicy); - } - - /** - * Instantiates a new control node tracker. - * - * @param zkHostPortList - * the zk host port list - * @param sessionTimeoutMs - * session timeout - * @param connectionTimeoutMs - * connection timeout - * @param retryPolicy - * the retry policy - */ - public ControlNodeTracker(String zkHostPortList, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { + public ControlNodeTracker() { super(); - if (sessionTimeoutMs > -1 && connectionTimeoutMs > -1) { - this.client = CuratorFrameworkFactory.newClient(zkHostPortList, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); - } else { - this.client = CuratorFrameworkFactory.newClient(zkHostPortList, retryPolicy); - } this.listeners = new CopyOnWriteArrayList(); } @@ -146,10 +119,9 @@ public ControlNodeTracker(String zkHostPortList, int sessionTimeoutMs, int conne */ public void start() throws Exception { //NOSONAR LOG.info("Starting node tracker"); - client.start(); - client.getUnhandledErrorListenable().addListener(errorsListener); + zkClient.getUnhandledErrorListenable().addListener(errorsListener); if(createZkNode()){ - controlCache = new NodeCache(client, CONTROL_SERVER_NODE_PATH); + controlCache = new NodeCache(zkClient, CONTROL_SERVER_NODE_PATH); controlCache.getListenable().addListener(new NodeCacheListener() { @Override @@ -204,7 +176,7 @@ protected void onMasterChange(ChildData currentData) { * @return true, if is connected */ public boolean isConnected() { - return client.getZookeeperClient().isConnected(); + return zkClient.getZookeeperClient().isConnected(); } /** @@ -250,14 +222,12 @@ public void close() throws IOException { if(nodePath != null){ try { - client.delete().forPath(nodePath); + zkClient.delete().forPath(nodePath); LOG.debug("Node with path {} successfully deleted", nodePath); } catch (Exception e) { LOG.debug("Failed to delete node", e); } } - - client.close(); } /* @@ -299,7 +269,7 @@ public boolean doZKClientAction(ZKClientAction action) throws IOException{ public boolean doZKClientAction(ZKClientAction action, boolean throwIOException) throws IOException{ try{ - action.doWithZkClient(client); + action.doWithZkClient(zkClient); return true; } catch (Exception e) { LOG.error("Unknown Error", e); diff --git a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/WorkerNodeTracker.java b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/WorkerNodeTracker.java index 196b2c7b19..1775909b24 100644 --- a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/WorkerNodeTracker.java +++ b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/WorkerNodeTracker.java @@ -20,7 +20,6 @@ import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -65,36 +64,25 @@ public abstract class WorkerNodeTracker extends ControlNodeTracker { /** * Instantiates a new worker node tracker. * - * @param zkHostPortList - * the zk host port list - * @param retryPolicy - * the retry policy */ - public WorkerNodeTracker(String zkHostPortList, RetryPolicy retryPolicy) { - super(zkHostPortList, retryPolicy); + public WorkerNodeTracker() { + super(); init(); } /** * Instantiates a new worker node tracker. - * - * @param zkHostPortList - * the zk host port list - * @param sessionTimeoutMs - * the session timeout - * @param connectionTimeoutMs - * the connection timeout - * @param retryPolicy - * the retry policy + * @param zkClient Zookeeper client */ - public WorkerNodeTracker(String zkHostPortList, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { - super(zkHostPortList, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); + public WorkerNodeTracker(CuratorFramework zkClient) { + super(); + this.zkClient = zkClient; init(); } private void init() { - endpointCache = new PathChildrenCache(client, OPERATIONS_SERVER_NODE_PATH, true); - bootstrapCache = new PathChildrenCache(client, BOOTSTRAP_SERVER_NODE_PATH, true); + endpointCache = new PathChildrenCache(zkClient, OPERATIONS_SERVER_NODE_PATH, true); + bootstrapCache = new PathChildrenCache(zkClient, BOOTSTRAP_SERVER_NODE_PATH, true); endpointListeners = new CopyOnWriteArrayList(); bootstrapListeners = new CopyOnWriteArrayList(); operationNodesStartTimes = new HashMap(); diff --git a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/bootstrap/BootstrapNode.java b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/bootstrap/BootstrapNode.java index 71becd60fd..a29af986ac 100644 --- a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/bootstrap/BootstrapNode.java +++ b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/bootstrap/BootstrapNode.java @@ -43,12 +43,20 @@ public class BootstrapNode extends WorkerNodeTracker { * Instantiates a new bootstrap node. * * @param nodeInfo the node info - * @param zkHostPortList the zk host port list - * @param retryPolicy the retry policy */ - public BootstrapNode(BootstrapNodeInfo nodeInfo, String zkHostPortList, - RetryPolicy retryPolicy) { - super(zkHostPortList, retryPolicy); + public BootstrapNode(BootstrapNodeInfo nodeInfo) { + super(); + this.nodeInfo = nodeInfo; + } + + /** + * Instantiates a new bootstrap node. + * + * @param nodeInfo the node info + * @param zkClient Zookeeper client + */ + public BootstrapNode(BootstrapNodeInfo nodeInfo, CuratorFramework zkClient) { + super(zkClient); this.nodeInfo = nodeInfo; } diff --git a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/control/ControlNode.java b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/control/ControlNode.java index 11ccadce7a..4efc999f45 100644 --- a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/control/ControlNode.java +++ b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/control/ControlNode.java @@ -46,13 +46,9 @@ public class ControlNode extends WorkerNodeTracker { * * @param currentNodeInfo * the current node info - * @param zkHostPortList - * the zk host port list - * @param retryPolicy - * the retry policy */ - public ControlNode(ControlNodeInfo currentNodeInfo, String zkHostPortList, RetryPolicy retryPolicy) { - super(zkHostPortList, retryPolicy); + public ControlNode(ControlNodeInfo currentNodeInfo) { + super(); this.currentNodeInfo = currentNodeInfo; } @@ -61,17 +57,10 @@ public ControlNode(ControlNodeInfo currentNodeInfo, String zkHostPortList, Retry * * @param currentNodeInfo * the current node info - * @param zkHostPortList - * the zk host port list - * @param sessionTimeoutMs - * session timeout - * @param connectionTimeoutMs - * connection timeout - * @param retryPolicy - * the retry policy + * @param zkClient Zookeeper client */ - public ControlNode(ControlNodeInfo currentNodeInfo, String zkHostPortList, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { - super(zkHostPortList, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); + public ControlNode(ControlNodeInfo currentNodeInfo, CuratorFramework zkClient) { + super(zkClient); this.currentNodeInfo = currentNodeInfo; } @@ -127,7 +116,7 @@ public ControlNodeInfo getCurrentNodeInfo() { @Override public boolean createZkNode() throws IOException { try { - nodePath = client.create().withMode(CreateMode.EPHEMERAL) + nodePath = zkClient.create().withMode(CreateMode.EPHEMERAL) .forPath(ControlNodeTracker.CONTROL_SERVER_NODE_PATH, controlNodeAvroConverter.get().toByteArray(currentNodeInfo)); LOG.info("Created node with path: " + nodePath); } catch (NodeExistsException e) { diff --git a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/operations/OperationsNode.java b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/operations/OperationsNode.java index 924c2b2591..452cd1c8a0 100644 --- a/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/operations/OperationsNode.java +++ b/server/common/zk/src/main/java/org/kaaproject/kaa/server/common/zk/operations/OperationsNode.java @@ -43,13 +43,22 @@ public class OperationsNode extends WorkerNodeTracker { * * @param nodeInfo * the node info - * @param zkHostPortList - * the zk host port list - * @param retryPolicy - * the retry policy */ - public OperationsNode(OperationsNodeInfo nodeInfo, String zkHostPortList, RetryPolicy retryPolicy) { - super(zkHostPortList, retryPolicy); + public OperationsNode(OperationsNodeInfo nodeInfo) { + super(); + this.nodeInfo = nodeInfo; + this.nodeInfo.setTimeStarted(System.currentTimeMillis()); + } + + /** + * Instantiates a new endpoint node. + * + * @param nodeInfo + * the node info + * @param zkClient Zookeeper client + */ + public OperationsNode(OperationsNodeInfo nodeInfo, CuratorFramework zkClient) { + super(zkClient); this.nodeInfo = nodeInfo; this.nodeInfo.setTimeStarted(System.currentTimeMillis()); } diff --git a/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/BootstrapNodeIT.java b/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/BootstrapNodeIT.java index 2bb32a3100..b4682fdfa9 100644 --- a/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/BootstrapNodeIT.java +++ b/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/BootstrapNodeIT.java @@ -29,9 +29,13 @@ import java.util.List; import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.kaaproject.kaa.server.common.zk.bootstrap.BootstrapNode; import org.kaaproject.kaa.server.common.zk.bootstrap.BootstrapNodeListener; @@ -49,86 +53,100 @@ public class BootstrapNodeIT { static final int HTTP_ID = 42; private static final String BOOTSTRAP_NODE_HOST = "192.168.0.202"; private static final String CONTROL_NODE_HOST = "192.168.0.1"; + private CuratorFramework zkClient; + private TestingCluster cluster; + + @Before + public void beforeTest() { + try { + cluster = new TestingCluster(3); + cluster.start(); + zkClient = CuratorFrameworkFactory.newClient(cluster.getConnectString(), buildDefaultRetryPolicy()); + zkClient.start(); + } catch (Exception e) { + System.err.println("Unable to initialize cluster before test! " + e); + } + } + + @After + public void afterTest() { + try { + zkClient.close(); + cluster.close(); + } catch (Exception e) { + System.err.println("Unable to shutdown cluster after test! " + e); + } + } @Test public void boostrapListenerTest() throws Exception { Timing timing = new Timing(); - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); - BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); - ControlNode controlNode = new ControlNode(controlNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - BootstrapNodeListener mockListener = mock(BootstrapNodeListener.class); - controlNode.addListener(mockListener); - controlNode.start(); + ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); + BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); - BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - bootstrapNode.start(); - timing.sleepABit(); + ControlNode controlNode = new ControlNode(controlNodeInfo, zkClient); + BootstrapNodeListener mockListener = mock(BootstrapNodeListener.class); + controlNode.addListener(mockListener); + controlNode.start(); - verify(mockListener).onNodeAdded(bootstrapNodeInfo); + BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, zkClient); + bootstrapNode.start(); + timing.sleepABit(); - List transports = bootstrapNodeInfo.getTransports(); - transports.remove(getHttpTransportMD()); - bootstrapNode.updateNodeData(bootstrapNodeInfo); - timing.sleepABit(); + verify(mockListener).onNodeAdded(bootstrapNodeInfo); - verify(mockListener).onNodeUpdated(bootstrapNodeInfo); + List transports = bootstrapNodeInfo.getTransports(); + transports.remove(getHttpTransportMD()); + bootstrapNode.updateNodeData(bootstrapNodeInfo); + timing.sleepABit(); - bootstrapNode.close(); - timing.sleepABit(); + verify(mockListener).onNodeUpdated(bootstrapNodeInfo); - verify(mockListener).onNodeRemoved(bootstrapNodeInfo); - bootstrapNode.close(); + bootstrapNode.close(); + timing.sleepABit(); - assertTrue(controlNode.removeListener(mockListener)); - assertFalse(controlNode.removeListener(mockListener)); - controlNode.close(); - } finally { - cluster.close(); - } + verify(mockListener).onNodeRemoved(bootstrapNodeInfo); + bootstrapNode.close(); + + assertTrue(controlNode.removeListener(mockListener)); + assertFalse(controlNode.removeListener(mockListener)); + controlNode.close(); } @Test public void outdatedRemovalTest() throws Exception { Timing timing = new Timing(); - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); - BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); - - ControlNode controlNode = new ControlNode(controlNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - BootstrapNodeListener mockListener = mock(BootstrapNodeListener.class); - controlNode.addListener(mockListener); - controlNode.start(); - - BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - bootstrapNode.start(); - timing.sleepABit(); - - verify(mockListener).onNodeAdded(bootstrapNodeInfo); - - BootstrapNodeInfo bootstrapNodeInfoWithGreaterTimeStarted = buildBootstrapNodeInfo(); - BootstrapNode bootstrapNodeWithGreaterTimeStarted = new BootstrapNode(bootstrapNodeInfoWithGreaterTimeStarted, cluster.getConnectString(), buildDefaultRetryPolicy()); - - bootstrapNodeWithGreaterTimeStarted.start(); - timing.sleepABit(); - - bootstrapNode.close(); - timing.sleepABit(); - verify(mockListener, never()).onNodeRemoved(bootstrapNodeInfo); - - bootstrapNodeWithGreaterTimeStarted.close(); - timing.sleepABit(); - verify(mockListener).onNodeRemoved(bootstrapNodeInfoWithGreaterTimeStarted); - - controlNode.close(); - } finally { - cluster.close(); - } + + ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); + BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); + + ControlNode controlNode = new ControlNode(controlNodeInfo, zkClient); + BootstrapNodeListener mockListener = mock(BootstrapNodeListener.class); + controlNode.addListener(mockListener); + controlNode.start(); + + BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, zkClient); + bootstrapNode.start(); + timing.sleepABit(); + + verify(mockListener).onNodeAdded(bootstrapNodeInfo); + + BootstrapNodeInfo bootstrapNodeInfoWithGreaterTimeStarted = buildBootstrapNodeInfo(); + BootstrapNode bootstrapNodeWithGreaterTimeStarted = new BootstrapNode(bootstrapNodeInfoWithGreaterTimeStarted, zkClient); + + bootstrapNodeWithGreaterTimeStarted.start(); + timing.sleepABit(); + + bootstrapNode.close(); + timing.sleepABit(); + verify(mockListener, never()).onNodeRemoved(bootstrapNodeInfo); + + bootstrapNodeWithGreaterTimeStarted.close(); + timing.sleepABit(); + verify(mockListener).onNodeRemoved(bootstrapNodeInfoWithGreaterTimeStarted); + + controlNode.close(); } private RetryPolicy buildDefaultRetryPolicy() { diff --git a/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/ControlNodeIT.java b/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/ControlNodeIT.java index 9ff5f15762..bd2c1f4f31 100644 --- a/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/ControlNodeIT.java +++ b/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/ControlNodeIT.java @@ -28,9 +28,13 @@ import java.util.Random; import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.kaaproject.kaa.server.common.zk.control.ControlNode; import org.kaaproject.kaa.server.common.zk.control.ControlNodeListener; @@ -45,95 +49,109 @@ public class ControlNodeIT { private static final String ENDPOINT_NODE_HOST = "192.168.0.101"; private static final String SECONDARY_NODE_HOST = "192.168.0.2"; private static final String CONTROL_NODE_HOST = "192.168.0.1"; + private CuratorFramework zkClient; + private TestingCluster cluster; - @Test - public void masterFailoverTest() throws Exception { - Timing timing = new Timing(); - TestingCluster cluster = new TestingCluster(3); - cluster.start(); + @Before + public void beforeTest() { + try { + cluster = new TestingCluster(3); + cluster.start(); + zkClient = CuratorFrameworkFactory.newClient(cluster.getConnectString(), buildDefaultRetryPolicy()); + zkClient.start(); + } catch (Exception e) { + System.err.println("Unable to initialize cluster before test! " + e); + } + } + + @After + public void afterTest() { try { - ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); - - ControlNodeInfo secondaryNodeInfo = buildSecondaryNodeInfo(); - - OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - - OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - endpointNode.start(); - assertNull(endpointNode.getControlServerInfo()); - - ControlNode controlNode = new ControlNode(controlNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - assertFalse(controlNode.isMaster()); - controlNode.start(); - ControlNode secondaryNode = new ControlNode(secondaryNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - assertFalse(secondaryNode.isMaster()); - secondaryNode.start(); - timing.sleepABit(); - assertTrue(controlNode.isMaster()); - assertFalse(secondaryNode.isMaster()); - assertNotNull(endpointNode.getControlServerInfo()); - assertEquals(CONTROL_NODE_HOST, endpointNode.getControlServerInfo().getConnectionInfo().getThriftHost().toString()); - - controlNode.close(); - timing.sleepABit(); - - assertNotNull(endpointNode.getControlServerInfo()); - assertEquals(SECONDARY_NODE_HOST, endpointNode.getControlServerInfo().getConnectionInfo().getThriftHost().toString()); - secondaryNode.close(); - endpointNode.close(); - } finally { + zkClient.close(); cluster.close(); + } catch (Exception e) { + System.err.println("Unable to shutdown cluster after test! " + e); } } + @Test + public void masterFailoverTest() throws Exception { + Timing timing = new Timing(); + + ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); + + ControlNodeInfo secondaryNodeInfo = buildSecondaryNodeInfo(); + + OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); + + OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, zkClient); + endpointNode.start(); + assertNull(endpointNode.getControlServerInfo()); + + ControlNode controlNode = new ControlNode(controlNodeInfo, zkClient); + assertFalse(controlNode.isMaster()); + controlNode.start(); + ControlNode secondaryNode = new ControlNode(secondaryNodeInfo, zkClient); + assertFalse(secondaryNode.isMaster()); + secondaryNode.start(); + timing.sleepABit(); + assertTrue(controlNode.isMaster()); + assertFalse(secondaryNode.isMaster()); + assertNotNull(endpointNode.getControlServerInfo()); + assertEquals(CONTROL_NODE_HOST, endpointNode.getControlServerInfo().getConnectionInfo().getThriftHost().toString()); + + controlNode.close(); + timing.sleepABit(); + + assertNotNull(endpointNode.getControlServerInfo()); + assertEquals(SECONDARY_NODE_HOST, endpointNode.getControlServerInfo().getConnectionInfo().getThriftHost().toString()); + secondaryNode.close(); + endpointNode.close(); + } + @Test public void masterListenerTest() throws Exception { Timing timing = new Timing(); - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); - ControlNodeInfo secondaryNodeInfo = buildSecondaryNodeInfo(); - OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - - OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - ControlNodeListener mockListener = mock(ControlNodeListener.class); - endpointNode.addListener(mockListener); - endpointNode.start(); - - ControlNode controlNode = new ControlNode(controlNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - controlNode.start(); - ControlNode secondaryNode = new ControlNode(secondaryNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - secondaryNode.start(); - timing.sleepABit(); - verify(mockListener).onControlNodeChange(controlNodeInfo); - - int random = new Random().nextInt(); - controlNodeInfo.setBootstrapServerCount(random); - controlNode.updateNodeData(controlNodeInfo); - - int random2 = new Random().nextInt(); - secondaryNodeInfo.setBootstrapServerCount(random2); - secondaryNode.updateNodeData(secondaryNodeInfo); - - timing.sleepABit(); - verify(mockListener).onControlNodeChange(controlNodeInfo); - assertEquals(new Integer(random), endpointNode.getControlServerInfo().getBootstrapServerCount()); - assertEquals(new Integer(random2), secondaryNode.getCurrentNodeInfo().getBootstrapServerCount()); - - controlNode.close(); - timing.sleepABit(); - verify(mockListener).onControlNodeDown(); - verify(mockListener).onControlNodeChange(secondaryNodeInfo); - - assertTrue(endpointNode.removeListener(mockListener)); - assertFalse(endpointNode.removeListener(mockListener)); - - secondaryNode.close(); - endpointNode.close(); - } finally { - cluster.close(); - } + + ControlNodeInfo controlNodeInfo = buildControlNodeInfo(); + ControlNodeInfo secondaryNodeInfo = buildSecondaryNodeInfo(); + OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); + + OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, zkClient); + ControlNodeListener mockListener = mock(ControlNodeListener.class); + endpointNode.addListener(mockListener); + endpointNode.start(); + + ControlNode controlNode = new ControlNode(controlNodeInfo, zkClient); + controlNode.start(); + ControlNode secondaryNode = new ControlNode(secondaryNodeInfo, zkClient); + secondaryNode.start(); + timing.sleepABit(); + verify(mockListener).onControlNodeChange(controlNodeInfo); + + int random = new Random().nextInt(); + controlNodeInfo.setBootstrapServerCount(random); + controlNode.updateNodeData(controlNodeInfo); + + int random2 = new Random().nextInt(); + secondaryNodeInfo.setBootstrapServerCount(random2); + secondaryNode.updateNodeData(secondaryNodeInfo); + + timing.sleepABit(); + verify(mockListener).onControlNodeChange(controlNodeInfo); + assertEquals(new Integer(random), endpointNode.getControlServerInfo().getBootstrapServerCount()); + assertEquals(new Integer(random2), secondaryNode.getCurrentNodeInfo().getBootstrapServerCount()); + + controlNode.close(); + timing.sleepABit(); + verify(mockListener).onControlNodeDown(); + verify(mockListener).onControlNodeChange(secondaryNodeInfo); + + assertTrue(endpointNode.removeListener(mockListener)); + assertFalse(endpointNode.removeListener(mockListener)); + + secondaryNode.close(); + endpointNode.close(); } private RetryPolicy buildDefaultRetryPolicy() { diff --git a/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/OperationsNodeIT.java b/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/OperationsNodeIT.java index 6b8c5efc90..cbc446b6f2 100644 --- a/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/OperationsNodeIT.java +++ b/server/common/zk/src/test/java/org/kaaproject/kaa/server/common/zk/OperationsNodeIT.java @@ -27,10 +27,14 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingCluster; import org.apache.curator.test.Timing; +import org.apache.zookeeper.data.Stat; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.kaaproject.kaa.server.common.zk.bootstrap.BootstrapNode; import org.kaaproject.kaa.server.common.zk.gen.BootstrapNodeInfo; @@ -45,151 +49,160 @@ public class OperationsNodeIT { private static final int NEW_HTTP_ID = BootstrapNodeIT.HTTP_ID + 1; private static final String BOOTSTRAP_NODE_HOST = "192.168.0.202"; private static final String ENDPOINT_NODE_HOST = "192.168.0.101"; + private CuratorFramework zkClient; + private TestingCluster cluster; + + @Before + public void beforeTest() { + try { + cluster = new TestingCluster(3); + cluster.start(); + zkClient = CuratorFrameworkFactory.newClient(cluster.getConnectString(), buildDefaultRetryPolicy()); + zkClient.start(); + } catch (Exception e) { + System.err.println("Unable to initialize cluster before test! " + e); + } + } + + @After + public void afterTest() { + try { + zkClient.close(); + cluster.close(); + } catch (Exception e) { + System.err.println("Unable to shutdown cluster after test! " + e); + } + } @Test public void endpointListenerTest() throws Exception { Timing timing = new Timing(); - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); + OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - OperationsNodeListener mockListener = mock(OperationsNodeListener.class); - bootstrapNode.addListener(mockListener); - bootstrapNode.start(); + BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); - OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); + BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, zkClient); + OperationsNodeListener mockListener = mock(OperationsNodeListener.class); + bootstrapNode.addListener(mockListener); + bootstrapNode.start(); - endpointNode.start(); + OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, zkClient); - timing.sleepABit(); - verify(mockListener).onNodeAdded(endpointNodeInfo); + endpointNode.start(); - assertNotNull(bootstrapNode.getCurrentOperationServerNodes()); - assertEquals(1, bootstrapNode.getCurrentOperationServerNodes().size()); + timing.sleepABit(); + verify(mockListener).onNodeAdded(endpointNodeInfo); - OperationsNodeInfo testNodeInfo = bootstrapNode.getCurrentOperationServerNodes().get(0); - assertNotNull(testNodeInfo.getTransports()); - assertEquals(2, testNodeInfo.getTransports().size()); - assertNotNull(testNodeInfo.getTransports().get(0)); - assertEquals(BootstrapNodeIT.HTTP_ID, testNodeInfo.getTransports().get(0).getId().intValue()); - assertEquals(BootstrapNodeIT.TCP_ID, testNodeInfo.getTransports().get(1).getId().intValue()); - assertNotNull(testNodeInfo.getTransports().get(0).getConnectionInfo()); + assertNotNull(bootstrapNode.getCurrentOperationServerNodes()); + assertEquals(1, bootstrapNode.getCurrentOperationServerNodes().size()); - endpointNodeInfo.getTransports().get(0).setId(NEW_HTTP_ID); + OperationsNodeInfo testNodeInfo = bootstrapNode.getCurrentOperationServerNodes().get(0); + assertNotNull(testNodeInfo.getTransports()); + assertEquals(2, testNodeInfo.getTransports().size()); + assertNotNull(testNodeInfo.getTransports().get(0)); + assertEquals(BootstrapNodeIT.HTTP_ID, testNodeInfo.getTransports().get(0).getId().intValue()); + assertEquals(BootstrapNodeIT.TCP_ID, testNodeInfo.getTransports().get(1).getId().intValue()); + assertNotNull(testNodeInfo.getTransports().get(0).getConnectionInfo()); - endpointNode.updateNodeData(endpointNodeInfo); - timing.sleepABit(); - verify(mockListener).onNodeUpdated(endpointNodeInfo); + endpointNodeInfo.getTransports().get(0).setId(NEW_HTTP_ID); - assertNotNull(bootstrapNode.getCurrentOperationServerNodes()); - assertEquals(1, bootstrapNode.getCurrentOperationServerNodes().size()); - assertNotNull(bootstrapNode.getCurrentOperationServerNodes().get(0)); - testNodeInfo = bootstrapNode.getCurrentOperationServerNodes().get(0); - assertNotNull(testNodeInfo.getTransports()); - assertEquals(NEW_HTTP_ID, testNodeInfo.getTransports().get(0).getId().intValue()); + endpointNode.updateNodeData(endpointNodeInfo); + timing.sleepABit(); + verify(mockListener).onNodeUpdated(endpointNodeInfo); - endpointNode.close(); - timing.sleepABit(); - verify(mockListener).onNodeRemoved(endpointNodeInfo); - Assert.assertTrue(bootstrapNode.removeListener(mockListener)); - Assert.assertFalse(bootstrapNode.removeListener(mockListener)); - bootstrapNode.close(); - } finally { - cluster.close(); - } + assertNotNull(bootstrapNode.getCurrentOperationServerNodes()); + assertEquals(1, bootstrapNode.getCurrentOperationServerNodes().size()); + assertNotNull(bootstrapNode.getCurrentOperationServerNodes().get(0)); + testNodeInfo = bootstrapNode.getCurrentOperationServerNodes().get(0); + assertNotNull(testNodeInfo.getTransports()); + assertEquals(NEW_HTTP_ID, testNodeInfo.getTransports().get(0).getId().intValue()); + + endpointNode.close(); + timing.sleepABit(); + verify(mockListener).onNodeRemoved(endpointNodeInfo); + Assert.assertTrue(bootstrapNode.removeListener(mockListener)); + Assert.assertFalse(bootstrapNode.removeListener(mockListener)); + bootstrapNode.close(); } @Test public void endpointExceptionTest() throws Exception { - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - - OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - endpointNode.start(); - endpointNode.doZKClientAction(new ControlNodeTracker.ZKClientAction() { - - @Override - public void doWithZkClient(CuratorFramework client) throws Exception { - throw new Exception("for test"); - } - }); - - Assert.assertFalse(endpointNode.isConnected()); - endpointNode.close(); - } finally { - cluster.close(); - } + OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); + + OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, zkClient); + endpointNode.start(); + endpointNode.doZKClientAction(new ControlNodeTracker.ZKClientAction() { + + @Override + public void doWithZkClient(CuratorFramework client) throws Exception { + throw new Exception("for test"); + } + }); + + //check if operations node changed and deleted child node + //after unexpected exception, as child node was ephemeral and connection lost + String OPERATIONS_SERVER_NODE_PATH = "/operationsServerNodes"; + Stat stat = zkClient.checkExists().forPath(OPERATIONS_SERVER_NODE_PATH); + int cversion = stat.getCversion(); + int numChildren = stat.getNumChildren(); + Assert.assertEquals(cversion, 2); + Assert.assertEquals(numChildren, 0); + endpointNode.close(); } @Test(expected = IOException.class) public void endpointIOExceptionTest() throws Exception { - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); + OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - endpointNode.start(); - endpointNode.doZKClientAction(new ControlNodeTracker.ZKClientAction() { - @Override - public void doWithZkClient(CuratorFramework client) throws Exception { - throw new Exception("for test"); - } - }, true); + OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, zkClient); + endpointNode.start(); + endpointNode.doZKClientAction(new ControlNodeTracker.ZKClientAction() { - Assert.assertFalse(endpointNode.isConnected()); - endpointNode.close(); - } finally { - cluster.close(); - } + @Override + public void doWithZkClient(CuratorFramework client) throws Exception { + throw new Exception("for test"); + } + }, true); + + Assert.assertFalse(endpointNode.isConnected()); + endpointNode.close(); } @Test public void outdatedRemovalTest() throws Exception { Timing timing = new Timing(); - TestingCluster cluster = new TestingCluster(3); - cluster.start(); - try { - OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); - BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); - OperationsNodeListener mockListener = mock(OperationsNodeListener.class); - bootstrapNode.addListener(mockListener); - bootstrapNode.start(); + OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); + BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); - OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, cluster.getConnectString(), buildDefaultRetryPolicy()); + BootstrapNode bootstrapNode = new BootstrapNode(bootstrapNodeInfo, zkClient); + OperationsNodeListener mockListener = mock(OperationsNodeListener.class); + bootstrapNode.addListener(mockListener); + bootstrapNode.start(); - endpointNode.start(); - timing.sleepABit(); - verify(mockListener).onNodeAdded(endpointNodeInfo); + OperationsNode endpointNode = new OperationsNode(endpointNodeInfo, zkClient); - OperationsNodeInfo endpointNodeInfoWithGreaterTimeStarted = buildOperationsNodeInfo(); - OperationsNode endpointNodeWithGreaterTimeStarted = new OperationsNode(endpointNodeInfoWithGreaterTimeStarted, - cluster.getConnectString(), buildDefaultRetryPolicy()); + endpointNode.start(); + timing.sleepABit(); + verify(mockListener).onNodeAdded(endpointNodeInfo); - endpointNodeWithGreaterTimeStarted.start(); - timing.sleepABit(); + OperationsNodeInfo endpointNodeInfoWithGreaterTimeStarted = buildOperationsNodeInfo(); + OperationsNode endpointNodeWithGreaterTimeStarted = new OperationsNode(endpointNodeInfoWithGreaterTimeStarted, zkClient); - endpointNode.close(); - timing.sleepABit(); - verify(mockListener, never()).onNodeRemoved(endpointNodeInfo); + endpointNodeWithGreaterTimeStarted.start(); + timing.sleepABit(); - endpointNodeWithGreaterTimeStarted.close(); - timing.sleepABit(); - verify(mockListener).onNodeRemoved(endpointNodeInfoWithGreaterTimeStarted); + endpointNode.close(); + timing.sleepABit(); + verify(mockListener, never()).onNodeRemoved(endpointNodeInfo); - bootstrapNode.close(); - } finally { - cluster.close(); - } + endpointNodeWithGreaterTimeStarted.close(); + timing.sleepABit(); + verify(mockListener).onNodeRemoved(endpointNodeInfoWithGreaterTimeStarted); + + bootstrapNode.close(); } private RetryPolicy buildDefaultRetryPolicy() { diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/bootstrap/service/initialization/BootstrapInitializationService.java b/server/node/src/main/java/org/kaaproject/kaa/server/bootstrap/service/initialization/BootstrapInitializationService.java index 1c297c1ec9..392014a61e 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/bootstrap/service/initialization/BootstrapInitializationService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/bootstrap/service/initialization/BootstrapInitializationService.java @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.curator.retry.RetryUntilElapsed; +import org.apache.curator.framework.CuratorFramework; import org.kaaproject.kaa.server.bootstrap.service.OperationsServerListService; import org.kaaproject.kaa.server.bootstrap.service.security.KeyStoreService; import org.kaaproject.kaa.server.common.zk.bootstrap.BootstrapNode; @@ -62,6 +62,9 @@ public class BootstrapInitializationService extends AbstractInitializationServic @Autowired private KeyStoreService bootstrapKeyStoreService; + @Autowired + private CuratorFramework zkClient; + /* * (non-Javadoc) * @@ -143,7 +146,7 @@ private void startZK() throws Exception { // NOSONAR nodeInfo.setConnectionInfo(new ConnectionInfo(getNodeConfig().getThriftHost(), getNodeConfig().getThriftPort(), keyData)); nodeInfo.setTransports(new ArrayList()); nodeInfo.setTimeStarted(System.currentTimeMillis()); - bootstrapNode = new BootstrapNode(nodeInfo, getNodeConfig().getZkHostPortList(), new RetryUntilElapsed(getNodeConfig().getZkMaxRetryTime(), getNodeConfig().getZkSleepTime())); + bootstrapNode = new BootstrapNode(nodeInfo, zkClient); if (bootstrapNode != null) { bootstrapNode.start(); } diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/control/service/zk/ControlZkService.java b/server/node/src/main/java/org/kaaproject/kaa/server/control/service/zk/ControlZkService.java index 40d04eb026..932b67dd90 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/control/service/zk/ControlZkService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/control/service/zk/ControlZkService.java @@ -20,7 +20,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; -import org.apache.curator.retry.RetryUntilElapsed; +import org.apache.curator.framework.CuratorFramework; import org.apache.thrift.TException; import org.kaaproject.kaa.server.common.thrift.KaaThriftService; import org.kaaproject.kaa.server.common.thrift.gen.operations.Notification; @@ -52,6 +52,9 @@ public class ControlZkService { @Autowired private KaaNodeServerConfig kaaNodeServerConfig; + @Autowired + private CuratorFramework zkClient; + /** The control Zookeeper node. */ private ControlNode controlZKNode; @@ -73,8 +76,7 @@ public void start() { ControlNodeInfo nodeInfo = new ControlNodeInfo(); ConnectionInfo connectionInfo = new ConnectionInfo(getNodeConfig().getThriftHost(), getNodeConfig().getThriftPort(), null); nodeInfo.setConnectionInfo(connectionInfo); - controlZKNode = new ControlNode(nodeInfo, getNodeConfig().getZkHostPortList(), 60 * 1000, 3 * 1000, new RetryUntilElapsed( - getNodeConfig().getZkMaxRetryTime(), getNodeConfig().getZkSleepTime())); + controlZKNode = new ControlNode(nodeInfo, zkClient); try { controlZKNode.start(); } catch (Exception e) { diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/node/service/initialization/KaaNodeInitializationService.java b/server/node/src/main/java/org/kaaproject/kaa/server/node/service/initialization/KaaNodeInitializationService.java index 0695dc1d66..1ddbec80cc 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/node/service/initialization/KaaNodeInitializationService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/node/service/initialization/KaaNodeInitializationService.java @@ -29,6 +29,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.CuratorFramework; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; @@ -80,7 +81,10 @@ public class KaaNodeInitializationService extends AbstractInitializationService @Autowired @Lazy private InitializationService operationsInitializationService; - + + @Autowired + private CuratorFramework zkClient; + /* * (non-Javadoc) * @@ -100,6 +104,8 @@ public void start() { LOG.error("Interrupted while waiting for thrift to start...", e); } + waitZkConnection(); + if (getNodeConfig().isControlServiceEnabled()) { controlInitializationService.start(); } @@ -120,6 +126,18 @@ public void start() { } + private void waitZkConnection() { + if (!zkClient.isStarted()) { + zkClient.start(); + } + try { + LOG.info("Waiting connection to Zookeeper at ", getNodeConfig().getZkHostPortList()); + zkClient.blockUntilConnected(); + } catch (InterruptedException e) { + LOG.error("Zookeeper client was interrupted while waiting for connection! ", getNodeConfig().getZkHostPortList(), e); + } + } + /* * (non-Javadoc) * @@ -136,6 +154,7 @@ public void stop() { if (getNodeConfig().isOperationsServiceEnabled()) { operationsInitializationService.stop(); } + zkClient.close(); server.stop(); ThriftExecutor.shutdown(); @@ -282,5 +301,5 @@ public TServer createServer(TServerTransport serverTransport, TMultiplexedProces args.executorService = executorService; return new TThreadPoolServer(args); } - + } diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/initialization/OperationsInitializationService.java b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/initialization/OperationsInitializationService.java index d1fdcd9def..7bfbfb4747 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/initialization/OperationsInitializationService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/initialization/OperationsInitializationService.java @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.curator.retry.RetryUntilElapsed; +import org.apache.curator.framework.CuratorFramework; import org.kaaproject.kaa.server.common.zk.gen.ConnectionInfo; import org.kaaproject.kaa.server.common.zk.gen.LoadInfo; import org.kaaproject.kaa.server.common.zk.gen.OperationsNodeInfo; @@ -94,6 +94,9 @@ public class OperationsInitializationService extends AbstractInitializationServi @Autowired private LoadBalancingService loadBalancingService; + @Autowired + private CuratorFramework zkClient; + /** * OperationsServerConfig getter * @@ -192,8 +195,7 @@ private void startZK() { nodeInfo.setConnectionInfo(new ConnectionInfo(getNodeConfig().getThriftHost(), getNodeConfig().getThriftPort(), keyData)); nodeInfo.setLoadInfo(new LoadInfo(DEFAULT_LOAD_INDEX, 1.0)); nodeInfo.setTransports(new ArrayList()); - operationsNode = new OperationsNode(nodeInfo, getNodeConfig().getZkHostPortList(), new RetryUntilElapsed(getNodeConfig() - .getZkMaxRetryTime(), getNodeConfig().getZkSleepTime())); + operationsNode = new OperationsNode(nodeInfo, zkClient); try { operationsNode.start(); eventService.setZkNode(operationsNode); diff --git a/server/node/src/main/resources/kaaNodeContext.xml b/server/node/src/main/resources/kaaNodeContext.xml index 5a665d98f7..c68210d140 100644 --- a/server/node/src/main/resources/kaaNodeContext.xml +++ b/server/node/src/main/resources/kaaNodeContext.xml @@ -59,6 +59,20 @@ + + + + + + + + + + + + + + diff --git a/server/node/src/test/java/org/kaaproject/kaa/server/control/TestCluster.java b/server/node/src/test/java/org/kaaproject/kaa/server/control/TestCluster.java index 775368b7ee..83e3a11787 100644 --- a/server/node/src/test/java/org/kaaproject/kaa/server/control/TestCluster.java +++ b/server/node/src/test/java/org/kaaproject/kaa/server/control/TestCluster.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; @@ -83,15 +85,14 @@ public static void checkStarted(KaaNodeInitializationService kaaNodeInitializati zkCluster.start(); BootstrapNodeInfo bootstrapNodeInfo = buildBootstrapNodeInfo(); + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), buildDefaultRetryPolicy()); - bootstrapNode = new BootstrapNode(bootstrapNodeInfo, - zkCluster.getConnectString(), buildDefaultRetryPolicy()); + bootstrapNode = new BootstrapNode(bootstrapNodeInfo, zkClient); bootstrapNode.start(); OperationsNodeInfo endpointNodeInfo = buildEndpointNodeInfo(); - endpointNode = new OperationsNode(endpointNodeInfo, - zkCluster.getConnectString(), buildDefaultRetryPolicy()); + endpointNode = new OperationsNode(endpointNodeInfo, zkClient); endpointNode.start(); kaaNodeInstance = kaaNodeInitializationService; diff --git a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/EventServiceThriftTestIT.java b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/EventServiceThriftTestIT.java index 1efa2938c1..3b66b6dda1 100644 --- a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/EventServiceThriftTestIT.java +++ b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/EventServiceThriftTestIT.java @@ -30,6 +30,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryUntilElapsed; import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.server.TServer; @@ -300,7 +302,8 @@ public void registerZK() { nodeInfo.setLoadInfo(new LoadInfo(1, 1.0)); nodeInfo.setTransports(new ArrayList()); String zkHostPortList = "localhost:" + ZK_PORT; - operationsNode = new OperationsNode(nodeInfo, zkHostPortList, new RetryUntilElapsed(3000, 1000)); + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zkHostPortList, new RetryUntilElapsed(3000, 1000)); + operationsNode = new OperationsNode(nodeInfo, zkClient); try { operationsNode.start(); eventService.setZkNode(operationsNode); diff --git a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/TestCluster.java b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/TestCluster.java index 8f2bc96ebd..b6435cfe2b 100644 --- a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/TestCluster.java +++ b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/TestCluster.java @@ -20,7 +20,10 @@ import java.util.ArrayList; import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.retry.RetryUntilElapsed; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; import org.kaaproject.kaa.server.common.zk.gen.ConnectionInfo; @@ -63,9 +66,8 @@ public static void checkStarted() throws Exception { zkCluster.start(); LOG.info("ZK Cluster started"); OperationsNodeInfo endpointNodeInfo = buildOperationsNodeInfo(); - - operationsNode = new OperationsNode(endpointNodeInfo, - zkCluster.getConnectString(), buildDefaultRetryPolicy()); + CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), buildDefaultRetryPolicy()); + operationsNode = new OperationsNode(endpointNodeInfo, zkClient); operationsNode.start(); }