Skip to content

Commit

Permalink
KAA-1281 : block kaa-node startup if zookeeper is unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
nocs00 committed Sep 7, 2016
1 parent 0f7cca8 commit 3eb7232
Show file tree
Hide file tree
Showing 18 changed files with 89 additions and 18 deletions.
Expand Up @@ -118,7 +118,7 @@ public void startAndWait(String[] args) {
} }


LOG.info("{} application stopped.", getName()); LOG.info("{} application stopped.", getName());
}; }


/** /**
* Gets the name of the service. * Gets the name of the service.
Expand Down
Expand Up @@ -42,6 +42,9 @@ public class KaaNodeServerConfig {
/** The zk host port list. */ /** The zk host port list. */
private String zkHostPortList; private String zkHostPortList;


/** Time to connect to ZK. */
private int zkWaitConnectionTime;

/** The zk max retry time. */ /** The zk max retry time. */
private int zkMaxRetryTime; private int zkMaxRetryTime;


Expand Down Expand Up @@ -106,6 +109,15 @@ public String getZkHostPortList() {
return zkHostPortList; return zkHostPortList;
} }


/**
* Time to connect to ZK.
*
* @return the zkWaitConnectionTime
*/
public int getZkWaitConnectionTime() {
return zkWaitConnectionTime;
}

/** /**
* The zk max retry time. * The zk max retry time.
* *
Expand Down Expand Up @@ -189,6 +201,14 @@ public void setZkHostPortList(String zkHostPortList) {
this.zkHostPortList = zkHostPortList; this.zkHostPortList = zkHostPortList;
} }


/**
* @param zkWaitConnectionTime
* the zkWaitConnectionTime to set
*/
public void setZkWaitConnectionTime(int zkWaitConnectionTime) {
this.zkWaitConnectionTime = zkWaitConnectionTime;
}

/** /**
* @param zkMaxRetryTime * @param zkMaxRetryTime
* the zkMaxRetryTime to set * the zkMaxRetryTime to set
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportException;
import org.kaaproject.kaa.server.common.thrift.KaaThriftService; import org.kaaproject.kaa.server.common.thrift.KaaThriftService;
import org.kaaproject.kaa.server.common.thrift.cli.client.BaseCliThriftClient;
import org.kaaproject.kaa.server.common.thrift.gen.bootstrap.BootstrapThriftService; import org.kaaproject.kaa.server.common.thrift.gen.bootstrap.BootstrapThriftService;
import org.kaaproject.kaa.server.common.thrift.gen.node.KaaNodeThriftService; import org.kaaproject.kaa.server.common.thrift.gen.node.KaaNodeThriftService;
import org.kaaproject.kaa.server.common.thrift.gen.operations.OperationsThriftService; import org.kaaproject.kaa.server.common.thrift.gen.operations.OperationsThriftService;
Expand Down Expand Up @@ -104,20 +105,23 @@ public void start() {
LOG.error("Interrupted while waiting for thrift to start...", e); LOG.error("Interrupted while waiting for thrift to start...", e);
} }


waitZkConnection(); if (waitZkConnection()) {
if (getNodeConfig().isControlServiceEnabled()) {
controlInitializationService.start();
}
if (getNodeConfig().isBootstrapServiceEnabled()) {
bootstrapInitializationService.start();
}
if (getNodeConfig().isOperationsServiceEnabled()) {
operationsInitializationService.start();
}


if (getNodeConfig().isControlServiceEnabled()) { LOG.info("Kaa Node Server Started.");
controlInitializationService.start(); } else {
} LOG.error("Failed to connect to Zookeeper within {} minutes. Kaa Node Server will be stopped.", getNodeConfig().getZkWaitConnectionTime());
if (getNodeConfig().isBootstrapServiceEnabled()) { stopThrift();
bootstrapInitializationService.start();
} }
if (getNodeConfig().isOperationsServiceEnabled()) {
operationsInitializationService.start();
}

LOG.info("Kaa Node Server Started.");

try { try {
thriftShutdownLatch.await(); thriftShutdownLatch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Expand All @@ -126,15 +130,16 @@ public void start() {


} }


private void waitZkConnection() { private boolean waitZkConnection() {
if (!zkClient.isStarted()) { if (!zkClient.isStarted()) {
zkClient.start(); zkClient.start();
} }
try { try {
LOG.info("Waiting connection to Zookeeper at ", getNodeConfig().getZkHostPortList()); LOG.info("Waiting connection to Zookeeper at ", getNodeConfig().getZkHostPortList());
zkClient.blockUntilConnected(); return zkClient.blockUntilConnected(getNodeConfig().getZkWaitConnectionTime(), TimeUnit.MINUTES);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Zookeeper client was interrupted while waiting for connection! ", getNodeConfig().getZkHostPortList(), e); LOG.error("Zookeeper client was interrupted while waiting for connection! ", getNodeConfig().getZkHostPortList(), e);
return false;
} }
} }


Expand Down Expand Up @@ -162,6 +167,22 @@ public void stop() {
LOG.info("Kaa Node Server Stopped."); LOG.info("Kaa Node Server Stopped.");
} }


/**
* Stop thrift.
*
*/
public void stopThrift() {
if (zkClient!=null) {
zkClient.close();
}
if (server!=null) {
server.stop();
}
ThriftExecutor.shutdown();

LOG.info("Kaa Node Server Stopped.");
}

/** /**
* Start thrift. * Start thrift.
* *
Expand Down
Expand Up @@ -160,6 +160,9 @@ public void onNotification(Notification notification) {
@PreDestroy @PreDestroy
public void preDestroy() { public void preDestroy() {
context.getEventService().removeListener(eventListener); context.getEventService().removeListener(eventListener);
akka.stop(ioRouter);
akka.stop(opsActor);
akka.terminate();
} }


@Override @Override
Expand Down
3 changes: 3 additions & 0 deletions server/node/src/main/resources/kaa-node.properties
Expand Up @@ -33,6 +33,9 @@ zk_enabled=true
# Zookeeper service url list. # Zookeeper service url list.
zk_host_port_list=localhost:2181 zk_host_port_list=localhost:2181


# Wait for connection time in minutes.
zk_wait_connection_time=5

# The max retry time in milliseconds. # The max retry time in milliseconds.
zk_max_retry_time=3000 zk_max_retry_time=3000


Expand Down
1 change: 1 addition & 0 deletions server/node/src/main/resources/kaaNodeContext.xml
Expand Up @@ -54,6 +54,7 @@
<property name="thriftPort" value="#{properties[thrift_port]}" /> <property name="thriftPort" value="#{properties[thrift_port]}" />
<property name="zkEnabled" value="#{properties[zk_enabled]}" /> <property name="zkEnabled" value="#{properties[zk_enabled]}" />
<property name="zkHostPortList" value="#{properties[zk_host_port_list]}" /> <property name="zkHostPortList" value="#{properties[zk_host_port_list]}" />
<property name="zkWaitConnectionTime" value="#{properties[zk_wait_connection_time]}" />
<property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" /> <property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" />
<property name="zkSleepTime" value="#{properties[zk_sleep_time]}" /> <property name="zkSleepTime" value="#{properties[zk_sleep_time]}" />
<property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" /> <property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" />
Expand Down
Expand Up @@ -16,8 +16,11 @@


package org.kaaproject.kaa.server.node.service.initialization; package org.kaaproject.kaa.server.node.service.initialization;


import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.stubVoid; import static org.mockito.Mockito.when;


import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.TMultiplexedProcessor;
Expand All @@ -30,9 +33,12 @@
import org.kaaproject.kaa.server.common.thrift.gen.node.KaaNodeThriftService; import org.kaaproject.kaa.server.common.thrift.gen.node.KaaNodeThriftService;
import org.kaaproject.kaa.server.common.thrift.gen.operations.OperationsThriftService; import org.kaaproject.kaa.server.common.thrift.gen.operations.OperationsThriftService;
import org.kaaproject.kaa.server.node.service.config.KaaNodeServerConfig; import org.kaaproject.kaa.server.node.service.config.KaaNodeServerConfig;
import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;


import java.util.concurrent.TimeUnit;

public class KaaNodeInitializationServiceTest { public class KaaNodeInitializationServiceTest {


private InitializationService controlInitializationService; private InitializationService controlInitializationService;
Expand Down Expand Up @@ -129,6 +135,7 @@ private KaaNodeInitializationService kaaNodeInitializationServiceSpy() throws Ex
kaaNodeServerConfig.setControlServiceEnabled(true); kaaNodeServerConfig.setControlServiceEnabled(true);
kaaNodeServerConfig.setBootstrapServiceEnabled(true); kaaNodeServerConfig.setBootstrapServiceEnabled(true);
kaaNodeServerConfig.setOperationsServiceEnabled(true); kaaNodeServerConfig.setOperationsServiceEnabled(true);
kaaNodeServerConfig.setZkWaitConnectionTime(5);


ReflectionTestUtils.setField(kaaNodeInitializationService, "kaaNodeServerConfig", kaaNodeServerConfig); ReflectionTestUtils.setField(kaaNodeInitializationService, "kaaNodeServerConfig", kaaNodeServerConfig);


Expand All @@ -145,8 +152,9 @@ private KaaNodeInitializationService kaaNodeInitializationServiceSpy() throws Ex
bootstrapInitializationService = mock(InitializationService.class); bootstrapInitializationService = mock(InitializationService.class);
operationsInitializationService = mock(InitializationService.class); operationsInitializationService = mock(InitializationService.class);
zkClient = mock(CuratorFramework.class); zkClient = mock(CuratorFramework.class);
stubVoid(zkClient).toReturn().on().start(); doNothing().when(zkClient).start();
stubVoid(zkClient).toReturn().on().blockUntilConnected(); doNothing().when(zkClient).blockUntilConnected();
when(zkClient.blockUntilConnected(anyInt(), any(TimeUnit.class))).thenReturn(true);


ReflectionTestUtils.setField(kaaNodeInitializationService, "controlInitializationService", controlInitializationService); ReflectionTestUtils.setField(kaaNodeInitializationService, "controlInitializationService", controlInitializationService);


Expand Down
Expand Up @@ -2,6 +2,7 @@ thrift_host=localhost
thrift_port=19092 thrift_port=19092
zk_enabled=true zk_enabled=true
zk_host_port_list=localhost:2185 zk_host_port_list=localhost:2185
zk_wait_connection_time=5
zk_max_retry_time=3000 zk_max_retry_time=3000
zk_sleep_time=1000 zk_sleep_time=1000
zk_ignore_errors=true zk_ignore_errors=true
Expand Down
Expand Up @@ -43,6 +43,7 @@
<property name="thriftPort" value="#{properties[thrift_port]}" /> <property name="thriftPort" value="#{properties[thrift_port]}" />
<property name="zkEnabled" value="#{properties[zk_enabled]}" /> <property name="zkEnabled" value="#{properties[zk_enabled]}" />
<property name="zkHostPortList" value="#{properties[zk_host_port_list]}" /> <property name="zkHostPortList" value="#{properties[zk_host_port_list]}" />
<property name="zkWaitConnectionTime" value="#{properties[zk_wait_connection_time]}" />
<property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" /> <property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" />
<property name="zkSleepTime" value="#{properties[zk_sleep_time]}" /> <property name="zkSleepTime" value="#{properties[zk_sleep_time]}" />
<property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" /> <property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" />
Expand Down
1 change: 1 addition & 0 deletions server/node/src/test/resources/common-test-context.xml
Expand Up @@ -51,6 +51,7 @@
<property name="thriftPort" value="#{properties[thrift_port]}" /> <property name="thriftPort" value="#{properties[thrift_port]}" />
<property name="zkEnabled" value="#{properties[zk_enabled]}" /> <property name="zkEnabled" value="#{properties[zk_enabled]}" />
<property name="zkHostPortList" value="#{properties[zk_host_port_list]}" /> <property name="zkHostPortList" value="#{properties[zk_host_port_list]}" />
<property name="zkWaitConnectionTime" value="#{properties[zk_wait_connection_time]}" />
<property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" /> <property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" />
<property name="zkSleepTime" value="#{properties[zk_sleep_time]}" /> <property name="zkSleepTime" value="#{properties[zk_sleep_time]}" />
<property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" /> <property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" />
Expand Down
1 change: 1 addition & 0 deletions server/node/src/test/resources/common-zk-test-context.xml
Expand Up @@ -51,6 +51,7 @@
<property name="thriftPort" value="#{properties[thrift_port]}" /> <property name="thriftPort" value="#{properties[thrift_port]}" />
<property name="zkEnabled" value="#{properties[zk_enabled]}" /> <property name="zkEnabled" value="#{properties[zk_enabled]}" />
<property name="zkHostPortList" value="#{properties[zk_host_port_list]}" /> <property name="zkHostPortList" value="#{properties[zk_host_port_list]}" />
<property name="zkWaitConnectionTime" value="#{properties[zk_wait_connection_time]}" />
<property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" /> <property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" />
<property name="zkSleepTime" value="#{properties[zk_sleep_time]}" /> <property name="zkSleepTime" value="#{properties[zk_sleep_time]}" />
<property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" /> <property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" />
Expand Down
Expand Up @@ -49,6 +49,7 @@
<property name="thriftPort" value="#{properties[thrift_port]}" /> <property name="thriftPort" value="#{properties[thrift_port]}" />
<property name="zkEnabled" value="#{properties[zk_enabled]}" /> <property name="zkEnabled" value="#{properties[zk_enabled]}" />
<property name="zkHostPortList" value="#{properties[zk_host_port_list]}" /> <property name="zkHostPortList" value="#{properties[zk_host_port_list]}" />
<property name="zkWaitConnectionTime" value="#{properties[zk_wait_connection_time]}" />
<property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" /> <property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" />
<property name="zkSleepTime" value="#{properties[zk_sleep_time]}" /> <property name="zkSleepTime" value="#{properties[zk_sleep_time]}" />
<property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" /> <property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" />
Expand Down
Expand Up @@ -9,6 +9,7 @@ admin_port=9080


zk_enabled=true zk_enabled=true
zk_host_port_list=localhost:2185 zk_host_port_list=localhost:2185
zk_wait_connection_time=5
zk_max_retry_time=3000 zk_max_retry_time=3000
zk_sleep_time=1000 zk_sleep_time=1000
zk_ignore_errors=true zk_ignore_errors=true
Expand Down
1 change: 1 addition & 0 deletions server/node/src/test/resources/kaa-node-test.properties
Expand Up @@ -8,6 +8,7 @@ thrift_port=10090
admin_port=9080 admin_port=9080
zk_enabled=true zk_enabled=true
zk_host_port_list=localhost:2185 zk_host_port_list=localhost:2185
zk_wait_connection_time=5
zk_max_retry_time=3000 zk_max_retry_time=3000
zk_sleep_time=1000 zk_sleep_time=1000
zk_ignore_errors=true zk_ignore_errors=true
Expand Down
1 change: 1 addition & 0 deletions server/node/src/test/resources/kaa-node-zk-test.properties
Expand Up @@ -8,6 +8,7 @@ thrift_port=10090
admin_port=9080 admin_port=9080
zk_enabled=false zk_enabled=false
zk_host_port_list=localhost:2185 zk_host_port_list=localhost:2185
zk_wait_connection_time=5
zk_max_retry_time=3000 zk_max_retry_time=3000
zk_sleep_time=1000 zk_sleep_time=1000
zk_ignore_errors=true zk_ignore_errors=true
Expand Down
Expand Up @@ -47,6 +47,7 @@
<property name="thriftPort" value="#{properties[thrift_port]}" /> <property name="thriftPort" value="#{properties[thrift_port]}" />
<property name="zkEnabled" value="#{properties[zk_enabled]}" /> <property name="zkEnabled" value="#{properties[zk_enabled]}" />
<property name="zkHostPortList" value="#{properties[zk_host_port_list]}" /> <property name="zkHostPortList" value="#{properties[zk_host_port_list]}" />
<property name="zkWaitConnectionTime" value="#{properties[zk_wait_connection_time]}" />
<property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" /> <property name="zkMaxRetryTime" value="#{properties[zk_max_retry_time]}" />
<property name="zkSleepTime" value="#{properties[zk_sleep_time]}" /> <property name="zkSleepTime" value="#{properties[zk_sleep_time]}" />
<property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" /> <property name="zkIgnoreErrors" value="#{properties[zk_ignore_errors]}" />
Expand Down
Expand Up @@ -19,6 +19,9 @@ zk_enabled=false
# Zookeeper service url list # Zookeeper service url list
zk_host_port_list=localhost:2181 zk_host_port_list=localhost:2181


# Wait for connection time in milliseconds.
zk_wait_connection_time=5

# The max retry time in milliseconds # The max retry time in milliseconds
zk_max_retry_time=3000 zk_max_retry_time=3000


Expand Down
Expand Up @@ -19,6 +19,9 @@ zk_enabled=true
# Zookeeper service url list # Zookeeper service url list
zk_host_port_list=localhost:2185 zk_host_port_list=localhost:2185


# Wait for connection time in milliseconds.
zk_wait_connection_time=5

# The max retry time in milliseconds # The max retry time in milliseconds
zk_max_retry_time=3000 zk_max_retry_time=3000


Expand Down

0 comments on commit 3eb7232

Please sign in to comment.